STORM-2306 - Messaging subsystem redesign. New Backpressure model.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc4c4807
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc4c4807
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc4c4807

Branch: refs/heads/master
Commit: bc4c480705fa543913b66befccabe8fa3bbe35f0
Parents: ab7b4ca
Author: Roshan Naik <ros...@hw13642.hsd1.ca.comcast.net>
Authored: Wed Feb 14 03:43:10 2018 -0800
Committer: Roshan Naik <ros...@hw13642.hsd1.ca.comcast.net>
Committed: Wed Feb 14 03:43:10 2018 -0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |  62 +-
 docs/Concepts.md                                |   5 +
 docs/Local-mode.md                              |   2 +-
 docs/Metrics.md                                 |   4 +-
 docs/Performance.md                             | 164 +++++
 .../storm/elasticsearch/common/EsTestUtil.java  |   2 +-
 examples/storm-perf/pom.xml                     |   2 +-
 .../org/apache/storm/perf/BackPressureTopo.java | 115 ++++
 .../perf/ConstSpoutIdBoltNullBoltTopo.java      |  38 +-
 .../storm/perf/ConstSpoutNullBoltTopo.java      |  32 +-
 .../apache/storm/perf/ConstSpoutOnlyTopo.java   |   6 +-
 .../storm/perf/FileReadWordCountTopo.java       |  23 +-
 .../storm/perf/HdfsSpoutNullBoltTopo.java       |  46 +-
 .../org/apache/storm/perf/JCQueuePerfTest.java  | 380 ++++++++++++
 .../org/apache/storm/perf/JCToolsPerfTest.java  | 227 +++++++
 .../org/apache/storm/perf/KafkaHdfsTopo.java    | 124 ++--
 .../storm/perf/KafkaSpoutNullBoltTopo.java      |  23 +-
 .../apache/storm/perf/LowThroughputTopo.java    | 154 +++++
 .../storm/perf/SimplifiedWordCountTopo.java     |  85 +++
 .../storm/perf/StrGenSpoutHdfsBoltTopo.java     |  37 +-
 .../org/apache/storm/perf/ThroughputMeter.java  |  72 +++
 .../org/apache/storm/perf/bolt/CountBolt.java   |   7 +-
 .../org/apache/storm/perf/bolt/DevNullBolt.java |  15 +-
 .../java/org/apache/storm/perf/bolt/IdBolt.java |   6 +-
 .../storm/perf/bolt/SplitSentenceBolt.java      |  19 +-
 .../org/apache/storm/perf/spout/ConstSpout.java |  23 +-
 .../apache/storm/perf/spout/FileReadSpout.java  |  54 +-
 .../apache/storm/perf/spout/StringGenSpout.java |  17 +-
 .../apache/storm/perf/spout/WordGenSpout.java   | 108 ++++
 .../storm/perf/utils/BasicMetricsCollector.java |  81 ++-
 .../org/apache/storm/perf/utils/Helper.java     |   4 +-
 .../apache/storm/perf/utils/IdentityBolt.java   |   6 +-
 .../apache/storm/perf/utils/MetricsSample.java  |  61 +-
 .../storm/elasticsearch/common/EsTestUtil.java  |   2 +-
 .../spout/SpoutOutputCollectorMock.java         |   5 +
 .../eventhubs/trident/TridentCollectorMock.java |   5 +
 .../hdfs/bolt/AvroGenericRecordBoltTest.java    |   2 +-
 .../apache/storm/hdfs/bolt/TestHdfsBolt.java    |   2 +-
 .../storm/hdfs/bolt/TestSequenceFileBolt.java   |   2 +-
 .../bolt/format/TestSimpleFileNameFormat.java   |  13 +-
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  |  21 +-
 .../apache/storm/hive/bolt/TestHiveBolt.java    |   2 +-
 .../storm/hive/common/TestHiveWriter.java       |   2 +-
 .../jms/spout/MockSpoutOutputCollector.java     |   5 +
 .../apache/storm/kafka/spout/KafkaSpout.java    |   2 +-
 .../storm/kafka/PartitionManagerTest.java       |   4 +
 .../apache/storm/kafka/bolt/KafkaBoltTest.java  |   4 +-
 pom.xml                                         |   8 +-
 storm-client/pom.xml                            |   6 +-
 .../src/jvm/org/apache/storm/Config.java        | 230 ++++---
 .../src/jvm/org/apache/storm/Constants.java     |   2 +-
 .../src/jvm/org/apache/storm/StormTimer.java    |  42 +-
 .../org/apache/storm/cluster/ClusterUtils.java  |   2 +-
 .../storm/cluster/IStormClusterState.java       | 108 ++--
 .../storm/cluster/StormClusterStateImpl.java    |  62 +-
 .../coordination/BatchOutputCollector.java      |   9 +-
 .../coordination/BatchOutputCollectorImpl.java  |   5 +
 .../storm/coordination/CoordinatedBolt.java     |   8 +-
 .../src/jvm/org/apache/storm/daemon/Acker.java  |   9 +-
 .../org/apache/storm/daemon/GrouperFactory.java |  11 +-
 .../org/apache/storm/daemon/StormCommon.java    |   8 +-
 .../src/jvm/org/apache/storm/daemon/Task.java   | 122 +++-
 .../daemon/metrics/BuiltinMetricsUtil.java      |  13 -
 .../daemon/metrics/SpoutThrottlingMetrics.java  |  13 +-
 .../supervisor/ClientSupervisorUtils.java       |   4 +-
 .../daemon/worker/BackPressureTracker.java      |  96 +++
 .../org/apache/storm/daemon/worker/Worker.java  | 386 +++++-------
 .../apache/storm/daemon/worker/WorkerState.java | 337 +++++-----
 .../storm/daemon/worker/WorkerTransfer.java     | 143 +++++
 .../jvm/org/apache/storm/executor/Executor.java | 326 +++++-----
 .../apache/storm/executor/ExecutorShutdown.java |  51 +-
 .../apache/storm/executor/ExecutorTransfer.java |  98 ++-
 .../apache/storm/executor/IRunningExecutor.java |  13 +-
 .../apache/storm/executor/LocalExecutor.java    |  10 +-
 .../org/apache/storm/executor/TupleInfo.java    |   8 +
 .../storm/executor/bolt/BoltExecutor.java       | 171 +++--
 .../executor/bolt/BoltOutputCollectorImpl.java  | 110 ++--
 .../storm/executor/spout/SpoutExecutor.java     | 272 +++++---
 .../spout/SpoutOutputCollectorImpl.java         |  94 ++-
 .../apache/storm/grouping/ShuffleGrouping.java  |   9 +-
 .../storm/hooks/info/BoltExecuteInfo.java       |   3 +-
 .../org/apache/storm/messaging/IConnection.java |  31 +-
 .../org/apache/storm/messaging/IContext.java    |  10 +-
 .../apache/storm/messaging/local/Context.java   |  28 +-
 .../messaging/netty/BackPressureStatus.java     |  75 +++
 .../apache/storm/messaging/netty/Client.java    | 129 ++--
 .../apache/storm/messaging/netty/Context.java   |   6 +-
 .../storm/messaging/netty/MessageDecoder.java   |  43 +-
 .../storm/messaging/netty/MessageEncoder.java   |  18 +-
 .../apache/storm/messaging/netty/Server.java    |  45 +-
 .../messaging/netty/StormClientHandler.java     |  43 +-
 .../netty/StormClientPipelineFactory.java       |  11 +-
 .../netty/StormServerPipelineFactory.java       |   6 +-
 .../org/apache/storm/policy/IWaitStrategy.java  |  61 ++
 .../apache/storm/policy/WaitStrategyPark.java   |  58 ++
 .../storm/policy/WaitStrategyProgressive.java   |  72 +++
 .../serialization/KryoTupleDeserializer.java    |   6 +-
 .../serialization/KryoValuesDeserializer.java   |   4 +-
 .../serialization/KryoValuesSerializer.java     |   5 +-
 .../serialization/SerializationFactory.java     |   2 +
 .../storm/spout/ISpoutOutputCollector.java      |   6 +
 .../storm/spout/SpoutOutputCollector.java       |   9 +-
 .../storm/state/DefaultStateSerializer.java     |   2 +-
 .../apache/storm/stats/BoltExecutorStats.java   |  55 +-
 .../jvm/org/apache/storm/stats/CommonStats.java |  64 +-
 .../apache/storm/stats/SpoutExecutorStats.java  |  32 +-
 .../storm/task/GeneralTopologyContext.java      |  11 +-
 .../org/apache/storm/task/IOutputCollector.java |   2 +
 .../org/apache/storm/task/OutputCollector.java  |   7 +-
 .../org/apache/storm/task/TopologyContext.java  |   5 +-
 .../org/apache/storm/testing/SpoutTracker.java  |   5 +
 .../trident/operation/TridentCollector.java     |   5 +
 .../operation/impl/CaptureCollector.java        |   5 +
 .../trident/operation/impl/GroupCollector.java  |   5 +
 .../storm/trident/planner/BridgeReceiver.java   |   6 +-
 .../storm/trident/planner/TupleReceiver.java    |   2 +-
 .../planner/processor/AggregateProcessor.java   |   7 +-
 .../planner/processor/AppendCollector.java      |   7 +
 .../planner/processor/EachProcessor.java        |   6 +
 .../planner/processor/FreshCollector.java       |   7 +
 .../trident/planner/processor/MapProcessor.java |   6 +
 .../processor/MultiReducerProcessor.java        |   6 +-
 .../processor/PartitionPersistProcessor.java    |   5 +
 .../planner/processor/ProjectedProcessor.java   |   7 +
 .../planner/processor/StateQueryProcessor.java  |   5 +
 .../trident/spout/RichSpoutBatchExecutor.java   |   7 +-
 .../trident/spout/RichSpoutBatchTriggerer.java  |   5 +
 .../trident/spout/TridentSpoutExecutor.java     |   5 +
 .../trident/topology/TridentBoltExecutor.java   |   9 +-
 .../windowing/AbstractTridentWindowManager.java |   5 +
 .../windowing/WindowTridentProcessor.java       |   5 +
 .../org/apache/storm/tuple/AddressedTuple.java  |  11 +-
 .../jvm/org/apache/storm/tuple/MessageId.java   |   7 +-
 .../jvm/org/apache/storm/tuple/TupleImpl.java   |  55 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  11 +-
 .../utils/DisruptorBackpressureCallback.java    |  27 -
 .../org/apache/storm/utils/DisruptorQueue.java  | 619 -------------------
 .../src/jvm/org/apache/storm/utils/JCQueue.java | 455 ++++++++++++++
 .../jvm/org/apache/storm/utils/MutableLong.java |   2 +-
 .../org/apache/storm/utils/ObjectReader.java    |   4 +
 .../jvm/org/apache/storm/utils/RotatingMap.java |   2 +-
 .../src/jvm/org/apache/storm/utils/Time.java    |   8 +-
 .../org/apache/storm/utils/TransferDrainer.java | 199 +++---
 .../src/jvm/org/apache/storm/utils/Utils.java   |  38 +-
 .../storm/utils/WorkerBackpressureCallback.java |  26 -
 .../storm/utils/WorkerBackpressureThread.java   |  80 ---
 .../storm/validation/ConfigValidation.java      |  33 +
 .../validation/ConfigValidationAnnotations.java |   9 +
 .../jvm/org/apache/storm/bolt/TestJoinBolt.java |  51 +-
 .../cluster/StormClusterStateImplTest.java      |  43 +-
 .../topology/WindowedBoltExecutorTest.java      |  16 +-
 .../utils/DisruptorQueueBackpressureTest.java   | 110 ----
 .../apache/storm/utils/DisruptorQueueTest.java  | 187 ------
 .../storm/utils/JCQueueBackpressureTest.java    |  95 +++
 .../jvm/org/apache/storm/utils/JCQueueTest.java | 209 +++++++
 .../utils/WorkerBackpressureThreadTest.java     |  50 --
 .../storm/clojure/IndifferentAccessMap.java     |  17 +-
 .../org/apache/storm/integration_test.clj       |  12 +-
 .../apache/storm/messaging/netty_unit_test.clj  |  42 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |  13 +-
 .../src/main/java/org/apache/storm/Testing.java |   8 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   2 +-
 .../storm/daemon/supervisor/BasicContainer.java |  72 +--
 163 files changed, 5334 insertions(+), 3009 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 55b9d29..6269282 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -185,19 +185,20 @@ topology.worker.receiver.thread.count: 1
 task.heartbeat.frequency.secs: 3
 task.refresh.poll.secs: 10
 task.credentials.poll.secs: 30
-task.backpressure.poll.secs: 30
-
-# now should be null by default
-topology.backpressure.enable: false
-backpressure.disruptor.high.watermark: 0.9
-backpressure.disruptor.low.watermark: 0.4
-backpressure.znode.timeout.secs: 30
-backpressure.znode.update.freq.secs: 15
 
 # Used by workers to communicate
 storm.messaging.netty.server_worker_threads: 1
 storm.messaging.netty.client_worker_threads: 1
 storm.messaging.netty.buffer_size: 5242880 #5MB buffer
+
+# The netty write buffer high watermark in bytes.
+# If the number of bytes queued in the netty's write buffer exceeds this 
value, the netty client will block
+# until the value falls below the low water mark.
+storm.messaging.netty.buffer.high.watermark: 16777216 # 16 MB
+# The netty write buffer low watermark in bytes.
+# Once the number of bytes queued in the write buffer exceeded the high water 
mark and then
+# dropped down below this value, any blocked clients will unblock and start 
processing further messages.
+storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB
 # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 
120, other workers should also wait at least that long before giving up on 
connecting to the other worker. The reconnection period need also be bigger 
than storm.zookeeper.session.timeout(default is 20s), so that we can abort the 
reconnection when the target worker is dead.
 storm.messaging.netty.max_retries: 300
 storm.messaging.netty.max_wait_ms: 1000
@@ -230,20 +231,42 @@ topology.multilang.serializer: 
"org.apache.storm.multilang.JsonSerializer"
 topology.shellbolt.max.pending: 100
 topology.skip.missing.kryo.registrations: false
 topology.max.task.parallelism: null
-topology.max.spout.pending: null
+topology.max.spout.pending: null    # ideally should be larger than 
topology.producer.batch.size. (esp. if topology.batch.flush.interval.millis=0)
 topology.state.synchronization.timeout.secs: 60
 topology.stats.sample.rate: 0.05
 topology.builtin.metrics.bucket.size.secs: 60
 topology.fall.back.on.java.serialization: true
 topology.worker.childopts: null
 topology.worker.logwriter.childopts: "-Xmx64m"
-topology.executor.receive.buffer.size: 1024 #batched
-topology.executor.send.buffer.size: 1024 #individual messages
-topology.transfer.buffer.size: 1024 # batched
 topology.tick.tuple.freq.secs: null
 topology.worker.shared.thread.pool.size: 4
+
+# Spout Wait Strategy - employed when there is no data to produce
 topology.spout.wait.strategy: "org.apache.storm.spout.SleepSpoutWaitStrategy"
 topology.sleep.spout.wait.strategy.time.ms: 1
+
+# Bolt Wait Strategy - employed when there is no data in its receive buffer to 
process
+topology.bolt.wait.strategy : "org.apache.storm.policy.WaitStrategyProgressive"
+
+topology.bolt.wait.park.microsec : 100          # park time for 
org.apache.storm.policy.WaitStrategyPark. Busy spins if set to 0.
+
+topology.bolt.wait.progressive.level1.count: 1          # number of iterations 
to spend in level 1 [no sleep] of WaitStrategyProgressive, before progressing 
to level 2
+topology.bolt.wait.progressive.level2.count: 1000       # number of iterations 
to spend in level 2 [parkNanos(1)] of WaitStrategyProgressive, before 
progressing to level 3
+topology.bolt.wait.progressive.level3.sleep.millis: 1   # sleep duration for 
idling iterations in level 3 of WaitStrategyProgressive
+
+# BackPressure Wait Strategy - for any producer (spout/bolt/transfer thread) 
when the downstream Q is full
+topology.backpressure.wait.strategy: 
"org.apache.storm.policy.WaitStrategyProgressive"
+
+topology.backpressure.wait.park.microsec: 100          #  park time for 
org.apache.storm.policy.WaitStrategyPark. Busy spins if set to 0.
+
+topology.backpressure.wait.progressive.level1.count: 1        # number of 
iterations to spend in level 1 [no sleep] of WaitStrategyProgressive, before 
progressing to level 2
+topology.backpressure.wait.progressive.level2.count: 1000     # number of 
iterations to spend in level 2 [parkNanos(1)] of WaitStrategyProgressive, 
before progressing to level 3
+topology.backpressure.wait.progressive.level3.sleep.millis: 1 # sleep duration 
for idling iterations in level 3 of WaitStrategyProgressive
+
+
+topology.backpressure.check.millis: 50   # how often to check if backpressure 
has relieved on executors under BP, for informing other workers to resume 
sending msgs to them. Must be > 0
+topology.executor.overflow.limit: 0    # max items in overflowQ of any 
bolt/spout. When exceeded, worker will drop incoming messages (from the 
workers) destined to that overflowing spout/bolt. Set to 0 to disable overflow 
limiting. Enabling this may degrade perf slightly.
+
 topology.error.throttle.interval.secs: 10
 topology.max.error.report.per.interval: 5
 topology.kryo.factory: "org.apache.storm.serialization.DefaultKryoFactory"
@@ -252,10 +275,16 @@ topology.trident.batch.emit.interval.millis: 500
 topology.testing.always.try.serialize: false
 topology.classpath: null
 topology.environment: null
-topology.bolts.outgoing.overflow.buffer.enable: false
-topology.disruptor.wait.timeout.millis: 1000
-topology.disruptor.batch.size: 100
-topology.disruptor.batch.timeout.millis: 1
+
+topology.transfer.buffer.size: 1000   # size of recv  queue for transfer 
worker thread
+topology.transfer.batch.size: 1       # can be no larger than half of 
`topology.transfer.buffer.size`
+
+topology.executor.receive.buffer.size: 32768  # size of recv queue for spouts 
& bolts. Will be internally rounded up to next power of 2 (if not already a 
power of 2)
+topology.producer.batch.size: 1               # can be no larger than half of 
`topology.executor.receive.buffer.size`
+
+topology.batch.flush.interval.millis: 1  # Flush tuples are disabled if this 
is set to 0 or if (topology.producer.batch.size=1 and 
topology.transfer.batch.size=1).
+topology.spout.recvq.skips: 3  # Check recvQ once every N invocations of 
Spout's nextTuple() [when ACKs disabled]
+
 topology.disable.loadaware.messaging: false
 topology.state.checkpoint.interval.ms: 1000
 topology.localityaware.higher.bound: 0.8
@@ -326,6 +355,7 @@ storm.supervisor.hard.memory.limit.overage.mb: 2024
 storm.supervisor.low.memory.threshold.mb: 1024
 storm.supervisor.medium.memory.threshold.mb: 1536
 storm.supervisor.medium.memory.grace.period.ms: 30000
+
 storm.topology.classpath.beginning.enabled: false
 worker.metrics:
     "CGroupMemory": "org.apache.storm.metric.cgroup.CGroupMemoryUsage"

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/docs/Concepts.md
----------------------------------------------------------------------
diff --git a/docs/Concepts.md b/docs/Concepts.md
index d46033c..8516b9e 100644
--- a/docs/Concepts.md
+++ b/docs/Concepts.md
@@ -113,3 +113,8 @@ Topologies execute across one or more worker processes. 
Each worker process is a
 **Resources:**
 
 * 
[Config.TOPOLOGY_WORKERS](javadocs/org/apache/storm/Config.html#TOPOLOGY_WORKERS):
 this config sets the number of workers to allocate for executing the topology
+
+### Performance Tuning
+
+Refer to [performance tuning guide](Performance.md)
+

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/docs/Local-mode.md
----------------------------------------------------------------------
diff --git a/docs/Local-mode.md b/docs/Local-mode.md
index a9e3a28..199d690 100644
--- a/docs/Local-mode.md
+++ b/docs/Local-mode.md
@@ -58,7 +58,7 @@ Storm also offers a clojure API for testing.
 
 ### Debugging your topology with an IDE
 
-One of the great use cases for local mode is to be able to walk through the 
code execution of your bolts and spouts using an IDE.  You can do this on the 
command line by adding the `--java-debug` option followed by the paramer you 
would pass to jdwp. This makes it simple to launch the local cluster with 
`-agentlib:jdwp=` turned on.
+One of the great use cases for local mode is to be able to walk through the 
code execution of your bolts and spouts using an IDE.  You can do this on the 
command line by adding the `--java-debug` option followed by the parameter you 
would pass to jdwp. This makes it simple to launch the local cluster with 
`-agentlib:jdwp=` turned on.
 
 When running from within an IDE itself you can modify your code run run 
withing a call to `LocalCluster.withLocalModeOverride`
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/docs/Metrics.md
----------------------------------------------------------------------
diff --git a/docs/Metrics.md b/docs/Metrics.md
index d45d1ef..09ed2fc 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -198,9 +198,9 @@ This is also just for bolts.  It is the average amount of 
time between when `exe
 This metric records how much time a spout was idle because more tuples than 
`topology.max.spout.pending` were still outstanding.  This is the total time in 
milliseconds, not the average amount of time and is not sub-sampled.
 
 
-##### `__skipped-throttle-ms`
+##### `__skipped-backpressure-ms`
 
-This metric records how much time a spout was idle because back-pressure 
indicated that downstream queues in the topology were too full.  This is the 
total time in milliseconds, not the average amount of time and is not 
sub-sampled.
+This metric records how much time a spout was idle because back-pressure 
indicated that downstream queues in the topology were too full.  This is the 
total time in milliseconds, not the average amount of time and is not 
sub-sampled. This is similar to skipped-throttle-ms in Storm 1.x.
 
 ##### `skipped-inactive-ms`
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/docs/Performance.md
----------------------------------------------------------------------
diff --git a/docs/Performance.md b/docs/Performance.md
new file mode 100644
index 0000000..6d7d33e
--- /dev/null
+++ b/docs/Performance.md
@@ -0,0 +1,164 @@
+---
+title: Performance Tuning
+layout: documentation
+documentation: true
+---
+
+Latency, throughput and resource consumption are the three key dimensions 
involved in performance tuning.
+In the following sections we discuss the settings that can used to tune along 
these dimension and understand the trade-offs.
+
+It is important to understand that these settings can vary depending on the 
topology, the type of hardware and the number of hosts used by the topology.
+
+## 1. Buffer Size
+Spouts and Bolts operate asynchronously using message passing. Message queues 
used for this purpose are of fixed but configurable size. Buffer size
+refers to the size of these queues. Every consumer has its own receive queue. 
The messages wait in the queue until the consumer is ready to process them.
+The queue will typically be almost empty or almost full depending whether the 
consumer is operating faster or slower than the rate at which producers
+are generating messages for it. Storm queues always have a single consumer and 
potentially multiple producers. There are two buffer size settings
+of interest:
+
+- `topology.executor.receive.buffer.size` : This is the size of the message 
queue employed for each spout and bolt executor.
+- `topology.transfer.buffer.size` : This is the size of the outbound message 
queue used for inter-worker messaging. This queue is referred to as
+the *Worker Transfer Queue*.
+
+**Note:** If the specified buffer size is not a power of 2, it is internally 
rounded up to the next power of 2.
+
+#### Guidance
+Very small message queues (size < 1024) are likely to hamper throughput by not 
providing enough isolation between the consumer and producer. This
+can affect the asynchronous nature of the processing as the producers are 
likely to find the downstream queue to be full.
+
+Very large message queues are also not desirable to deal with slow consumers. 
Better to employ more consumers (i.e. bolts) on additional CPU cores instead. 
If queues
+are large and often full, the messages will end up waiting longer in these 
queues at each step of the processing, leading to poor latency being
+reported on the Storm UI. Large queues also imply higher memory consumption 
especially if the queues are typically full.
+
+
+## 2. Batch Size
+Producers can either write a batch of messages to the consumer's queue or 
write each message individually. This batch size can be configured.
+Inserting messages in batches to downstream queues helps reduce the number of 
synchronization operations required for the inserts. Consequently this helps 
achieve higher throughput. However,
+sometimes it may take a little time for the buffer to fill up, before it is 
flushed into the downstream queue. This implies that the buffered messages
+will take longer to become visible to the downstream consumer who is waiting 
to process them. This can increase the average end-to-end latency for
+these messages. The latency can get very bad if the batch sizes are large and 
the topology is not experiencing high traffic.
+
+- `topology.producer.batch.size` : The batch size for writes into the receive 
queue of any spout/bolt is controlled via this setting. This setting
+impacts the communication within a worker process. Each upstream producer 
maintains a separate batch to a component's receive queue. So if two spout
+instances are writing to the same downstream bolt instance, each of the spout 
instances will have maintain a separate batch.
+
+-  `topology.transfer.batch.size` : Messages that are destined to a spout/bolt 
running on a different worker process, are sent to a queue called
+the **Worker Transfer Queue**. The Worker Transfer Thread is responsible for 
draining the messages in this queue and send them to the appropriate
+worker process over the network. This setting controls the batch size for 
writes into the Worker Transfer Queue.  This impacts the communication
+between worker processes.
+
+#### Guidance
+
+**For Low latency:** Set batch size to 1. This basically disables batching. 
This is likely to reduce peak sustainable throughput under heavy traffic, but
+not likely to impact throughput much under low/medium traffic situations.
+
+**For High throughput:** Set batch size > 1. Try values like 10, 100, 1000 or 
even higher and see what yields the best throughput for the topology.
+Beyond a certain point the throughput is likely to get worse.
+
+**Varying throughput:** Topologies often experience fluctuating amounts of 
incoming traffic over the day. Other topos may experience higher traffic in some
+paths and lower throughput in other paths simultaneously. If latency is not a 
concern, a small bach size (e.g. 10) and in conjunction with the right flush
+frequency may provide a reasonable compromise for such scenarios. For meeting 
stricter latency SLAs, consider setting it to 1.
+
+
+## 3. Flush Tuple Frequency
+In low/medium traffic situations or when batch size is too large, the batches 
may take too long to fill up and consequently the messages could take 
unacceptably
+long time to become visible to downstream components. In such case, periodic 
flushing of batches is necessary to keep the messages moving and avoid 
compromising
+latencies when batching is enabled.
+
+When batching has been enabled, special messages called *flush tuples* are 
inserted periodically into the receive queues of all spout and bolt instances.
+This causes each spout/bolt instance to flush all its outstanding batches to 
their respective downstream components.
+
+`topology.flush.tuple.freq.millis` : This setting controls how often the flush 
tuples are generated. Flush tuples are not generated if this configuration is
+set to 0 or if (`topology.producer.batch.size`=1 and 
`topology.transfer.batch.size`=1).
+
+
+#### Guidance
+Flushing interval can be used as tool to retain the higher throughput benefits 
of batching and avoid batched messages getting stuck for too long waiting for 
their.
+batch to fill. Preferably this value should be larger than the average execute 
latencies of the bolts in the topology. Trying to flush the queues more 
frequently than
+the amount of time it takes to produce the messages may hurt performance. 
Understanding the average execute latencies of each bolt will help determine 
the average
+number of messages in the queues between two flushes.
+
+**For Low latency:** A smaller value helps achieve tighter latency SLAs.
+
+**For High throughput:**  When trying to maximize throughput under high 
traffic situations, the batches are likely to get filled and flushed 
automatically.
+To optimize for such cases, this value can be set to a higher number.
+
+**Varying throughput:** If latency is not a concern, a larger value will 
optimize for high traffic situations. For meeting tighter SLAs set this to lower
+values.
+
+
+## 4. Wait Strategy
+Wait strategies are used to conserve CPU usage by trading off some latency and 
throughput. They are applied for the following situations:
+
+4.1 **Spout Wait:**  In low/no traffic situations, Spout's nextTuple() may not 
produce any new emits. To prevent invoking the Spout's nextTuple,
+this wait strategy is used between nextTuple() calls to allow the spout's 
executor thread to idle and conserve CPU. Select a strategy using 
`topology.spout.wait.strategy`.
+
+4.2 **Bolt Wait:** : When a bolt polls it's receive queue for new messages to 
process, it is possible that the queue is empty. This typically happens
+in case of low/no traffic situations or when the upstream spout/bolt is 
inherently slower. This wait strategy is used in such cases. It avoids high CPU 
usage
+due to the bolt continuously checking on a typically empty queue. Select a 
strategy using `topology.bolt.wait.strategy`. The chosen strategy can be 
further configured
+using the `topology.bolt.wait.*` settings.
+
+4.3 **Backpressure Wait** : Select a strategy using 
`topology.backpressure.wait.strategy`. When a spout/bolt tries to write to a 
downstream component's receive queue,
+there is a possibility that the queue is full. In such cases the write needs 
to be retried. This wait strategy is used to induce some idling in-between 
re-attempts for
+conserving CPU. The chosen strategy can be further configured using the 
`topology.backpressure.wait.*` settings.
+
+
+#### Built-in wait strategies:
+
+- **SleepSpoutWaitStrategy** : This is the only built-in strategy available 
for Spout Wait. It cannot be applied to other Wait situations. It is a simple 
static strategy that
+calls Thread.sleep() each time. Set `topology.spout.wait.strategy` to 
`org.apache.storm.spout.SleepSpoutWaitStrategy` for using this. 
`topology.sleep.spout.wait.strategy.time.ms`
+configures the sleep time.
+
+- **ProgressiveWaitStrategy** : This strategy can be used for Bolt Wait or 
Backpressure Wait situations. Set the strategy to 
'org.apache.storm.policy.WaitStrategyProgressive' to
+select this wait strategy. This is a dynamic wait strategy that enters into 
progressively deeper states of CPU conservation if the Backpressure Wait or 
Bolt Wait situations persist.
+It has 3 levels of idling and allows configuring how long to stay at each 
level :
+
+  1. No Waiting - The first few times it will return immediately. This does 
not conserve any CPU. The number of times it remains in this state is 
configured using
+  `topology.bolt.wait.progressive.level1.count` or 
`topology.backpressure.wait.progressive.level1.count` depending which situation 
it is being used.
+
+  2. Park Nanos - In this state it disables the current thread for thread 
scheduling purposes, for 1 nano second using LockSupport.parkNanos(). This puts 
the CPU in a minimal
+  conservation state. It remains in this state for 
`topology.bolt.wait.progressive.level2.count` or 
`topology.backpressure.wait.progressive.level2.count` iterations.
+
+  3. Thread.sleep() - In this state it calls Thread.sleep() with the value 
specified in `topology.backpressure.wait.progressive.level3.sleep.millis` or in
+  `topology.bolt.wait.progressive.level3.sleep.millis` based on the Wait 
situation it is used in. This is the most CPU conserving level it remains in 
this level for
+  the remaining iterations.
+
+
+- **ParkWaitStrategy** : This strategy can be used for Bolt Wait or 
Backpressure Wait situations. Set the strategy to 
`org.apache.storm.policy.WaitStrategyPark` to use this.
+This strategy disables the current thread for thread scheduling purposes by 
calling LockSupport.parkNanos(). The amount of park time is configured using 
either
+`topology.bolt.wait.park.microsec` or 
`topology.backpressure.wait.park.microsec` based on the wait situation it is 
used. Setting the park time to 0, effectively disables
+invocation of LockSupport.parkNanos and this mode can be used to achieve busy 
polling (which at the cost of high CPU utilization even when idle, may improve 
latency and/or throughput).
+
+
+## 5. Max.spout.pending
+The setting `topology.max.spout.pending` limits the number of un-ACKed tuples 
at the spout level. Once a spout reaches this limit, the spout's nextTuple()
+method will not be called until some ACKs are received for the outstanding 
emits. This setting does not have any affect if ACKing is disabled. It
+is a spout throttling mechanism which can impact throughput and latency. 
Setting it to null disables it for storm-core topologies. Impact on throughput
+is dependent on the topology and its concurrency (workers/executors), so 
experimentation is necessary to determine optimal setting. Latency and memory 
consumption
+is expected to typically increase with higher and higher values for this.
+
+
+## 6. Load Aware messaging
+When load aware messaging is enabled (default), shuffle grouping takes 
additional factors into consideration for message routing.
+Impact of this on performance is dependent on the topology and its deployment 
footprint (i.e. distribution over process and machines).
+Consequently it is useful to assess the impact of setting 
`topology.disable.loadaware.messaging` to `true` or `false` for your
+specific case.
+
+
+## 7. Sampling Rate
+Sampling rate is used to control how often certain metrics are computed on the 
Spout and Bolt executors. This is configured using `topology.stats.sample.rate`
+Setting it to 1 means, the stats are computed for every emitted message. As an 
example, to sample once every 1000 messages it can be set to  0.001. It may be
+possible to improve throughput and latency by reducing the sampling rate.
+
+
+## 8. Budgeting CPU cores for Executors
+There are three main types of executors (i.e threads) to take into account 
when budgeting CPU cores for them. Spout Executors, Bolt Executors, Worker 
Transfer (handles outbound
+messages) and NettyWorker (handles inbound messages).
+The first two are used to run spout, bolt and acker instances. The Worker 
Transfer thread is used to serialize and send messages to other workers (in 
multi-worker mode).
+
+Executors that are expected to remain busy, either because they are handling a 
lot of messages, or because their processing is inherently CPU intensive, 
should be allocated
+1 physical core each. Allocating logical cores (instead of physical) or less 
than 1 physical core for CPU intensive executors increases CPU contention and 
performance can suffer.
+Executors that are not expected to be busy can be allocated a smaller fraction 
of the physical core (or even logical cores). It maybe not be economical to 
allocate a full physical
+core for executors that are not likely to saturate the CPU.
+
+The *system bolt* generally processes very few messages per second, and so 
requires very little cpu (typically less than 10% of a physical core).

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
 
b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index 189813a..aa12ae1 100644
--- 
a/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ 
b/examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -43,7 +43,7 @@ public class EsTestUtil {
                 return new Fields("source", "index", "type", "id");
             }
         };
-        return new TupleImpl(topologyContext, new Values(source, index, type, 
id), 1, "");
+        return new TupleImpl(topologyContext, new Values(source, index, type, 
id), source, 1, "");
     }
 
     public static EsTupleMapper generateDefaultTupleMapper() {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml
index 8fad512..af23137 100644
--- a/examples/storm-perf/pom.xml
+++ b/examples/storm-perf/pom.xml
@@ -81,7 +81,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>207</maxAllowedViolations>
+                    <maxAllowedViolations>100</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/BackPressureTopo.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/BackPressureTopo.java 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/BackPressureTopo.java
new file mode 100644
index 0000000..0443fd4
--- /dev/null
+++ 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/BackPressureTopo.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.spout.ConstSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
+import org.slf4j.LoggerFactory;
+
+
+public class BackPressureTopo {
+
+    private static final String SPOUT_ID = "ConstSpout";
+    private static final String BOLT_ID = "ThrottledBolt";
+    private static final Integer SPOUT_COUNT = 1;
+    private static final Integer BOLT_COUNT = 1;
+    private static final String SLEEP_MS = "sleep";
+
+    static StormTopology getTopology(Map<String, Object> conf) {
+
+        Long sleepMs = ObjectReader.getLong(conf.get(SLEEP_MS));
+        // 1 -  Setup Spout   --------
+        ConstSpout spout = new ConstSpout("some 
data").withOutputFields("string");
+
+        // 2 -  Setup DevNull Bolt   --------
+        ThrottledBolt bolt = new ThrottledBolt(sleepMs);
+
+
+        // 3 - Setup Topology  --------
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(SPOUT_ID, spout, Helper.getInt(conf, SPOUT_COUNT, 1));
+        BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, Helper.getInt(conf, 
BOLT_COUNT, 1));
+
+        bd.localOrShuffleGrouping(SPOUT_ID);
+        return builder.createTopology();
+    }
+
+    public static void main(String[] args) throws Exception {
+        int runTime = -1;
+        Config topoConf = new Config();
+        topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 1);
+        topoConf.putAll(Utils.readCommandLineOpts());
+        if (args.length > 0) {
+            long sleepMs = Integer.parseInt(args[0]);
+            topoConf.put(SLEEP_MS, sleepMs);
+        }
+        if (args.length > 1) {
+            runTime = Integer.parseInt(args[1]);
+        }
+        if (args.length > 2) {
+            System.err.println("args: boltSleepMs [runDurationSec] ");
+            return;
+        }
+        //  Submit topology to storm cluster
+        Helper.runOnClusterAndPrintMetrics(runTime, "BackPressureTopo", 
topoConf, getTopology(topoConf));
+    }
+
+    private static class ThrottledBolt extends BaseRichBolt {
+        private static final org.slf4j.Logger LOG = 
LoggerFactory.getLogger(ThrottledBolt.class);
+        private OutputCollector collector;
+        private long sleepMs;
+
+        public ThrottledBolt(Long sleepMs) {
+            this.sleepMs = sleepMs;
+        }
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext 
context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple tuple) {
+            collector.ack(tuple);
+            LOG.debug("Sleeping");
+            try {
+                Thread.sleep(sleepMs);
+            } catch (InterruptedException e) {
+                //.. ignore
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
index 69df3fb..dc3649e 100644
--- 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
+++ 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
@@ -18,6 +18,8 @@
 
 package org.apache.storm.perf;
 
+import java.util.Map;
+
 import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.perf.bolt.DevNullBolt;
@@ -27,14 +29,12 @@ import org.apache.storm.perf.utils.Helper;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 
-import java.util.Map;
-
 /**
  * ConstSpout -> IdBolt -> DevNullBolt
  * This topology measures speed of messaging between spouts->bolt  and  
bolt->bolt
- *   ConstSpout : Continuously emits a constant string
- *   IdBolt : clones and emits input tuples
- *   DevNullBolt : discards incoming tuples
+ * ConstSpout : Continuously emits a constant string
+ * IdBolt : clones and emits input tuples
+ * DevNullBolt : discards incoming tuples
  */
 public class ConstSpoutIdBoltNullBoltTopo {
 
@@ -48,7 +48,7 @@ public class ConstSpoutIdBoltNullBoltTopo {
     public static final String BOLT2_COUNT = "bolt2.count";
     public static final String SPOUT_COUNT = "spout.count";
 
-    public static StormTopology getTopology(Map<String, Object> conf) {
+    static StormTopology getTopology(Map<String, Object> conf) {
 
         // 1 -  Setup Spout   --------
         ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
@@ -61,14 +61,17 @@ public class ConstSpoutIdBoltNullBoltTopo {
         // 3 - Setup Topology  --------
         TopologyBuilder builder = new TopologyBuilder();
 
-        builder.setSpout(SPOUT_ID, spout,  Helper.getInt(conf, SPOUT_COUNT, 1) 
);
+        int numSpouts = Helper.getInt(conf, SPOUT_COUNT, 1);
+        builder.setSpout(SPOUT_ID, spout, numSpouts);
 
-        builder.setBolt(BOLT1_ID, bolt1, Helper.getInt(conf, BOLT1_COUNT, 1))
-                .localOrShuffleGrouping(SPOUT_ID);
-
-        builder.setBolt(BOLT2_ID, bolt2, Helper.getInt(conf, BOLT2_COUNT, 1))
-                .localOrShuffleGrouping(BOLT1_ID);
+        int numBolt1 = Helper.getInt(conf, BOLT1_COUNT, 1);
+        builder.setBolt(BOLT1_ID, bolt1, numBolt1)
+            .localOrShuffleGrouping(SPOUT_ID);
 
+        int numBolt2 = Helper.getInt(conf, BOLT2_COUNT, 1);
+        builder.setBolt(BOLT2_ID, bolt2, numBolt2)
+            .localOrShuffleGrouping(BOLT1_ID);
+        System.err.printf("====> Using : numSpouts = %d , numBolt1 = %d, 
numBolt2=%d\n", numSpouts, numBolt1, numBolt2);
         return builder.createTopology();
     }
 
@@ -76,12 +79,23 @@ public class ConstSpoutIdBoltNullBoltTopo {
     public static void main(String[] args) throws Exception {
         int runTime = -1;
         Config topoConf = new Config();
+        // Configure for achieving max throughput in single worker mode 
(empirically found).
+        //     -- Expect ~5.3 mill/sec (3.2 mill/sec with batchSz=1)
+        //     -- ~1 mill/sec, lat= ~20 microsec  with acker=1 & batchSz=1
+        topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
+        topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 500);
+        topoConf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 50_000);
+        topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+        topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
+
         if (args.length > 0) {
             runTime = Integer.parseInt(args[0]);
         }
         if (args.length > 1) {
             topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
         }
+        topoConf.putAll(Utils.readCommandLineOpts());
+
         if (args.length > 2) {
             System.err.println("args: [runDurationSec]  [optionalConfFile]");
             return;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
index 298c73e..ee778fb 100755
--- 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
+++ 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
@@ -30,7 +30,7 @@ import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 
 /***
- * This topo helps measure the messaging speed between a spout and a bolt.
+ *  This topo helps measure the messaging peak throughput between a spout and 
a bolt.
  *  Spout generates a stream of a fixed string.
  *  Bolt will simply ack and discard the tuple received
  */
@@ -50,7 +50,7 @@ public class ConstSpoutNullBoltTopo {
     public static final String SHUFFLE_GROUPING = "shuffle";
     public static final String DEFAULT_GROUPING = LOCAL_GROPING;
 
-    public static StormTopology getTopology(Map<String, Object> conf) {
+    static StormTopology getTopology(Map<String, Object> conf) {
 
         // 1 -  Setup Spout   --------
         ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
@@ -62,14 +62,20 @@ public class ConstSpoutNullBoltTopo {
         // 3 - Setup Topology  --------
         TopologyBuilder builder = new TopologyBuilder();
 
-        builder.setSpout(SPOUT_ID, spout,  Helper.getInt(conf, SPOUT_COUNT, 1) 
);
-        BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, Helper.getInt(conf, 
BOLT_COUNT, 1));
+        int numSpouts = Helper.getInt(conf, SPOUT_COUNT, 1);
+        builder.setSpout(SPOUT_ID, spout, numSpouts);
+
+        int numBolts = Helper.getInt(conf, BOLT_COUNT, 1);
+        BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, numBolts);
+
+        System.err.printf("====> Using : numSpouts = %d , numBolts = %d\n", 
numSpouts, numBolts);
 
         String groupingType = Helper.getStr(conf, GROUPING);
-        if(groupingType==null || 
groupingType.equalsIgnoreCase(DEFAULT_GROUPING) )
+        if (groupingType == null || 
groupingType.equalsIgnoreCase(DEFAULT_GROUPING)) {
             bd.localOrShuffleGrouping(SPOUT_ID);
-        else if(groupingType.equalsIgnoreCase(SHUFFLE_GROUPING) )
+        } else if (groupingType.equalsIgnoreCase(SHUFFLE_GROUPING)) {
             bd.shuffleGrouping(SPOUT_ID);
+        }
         return builder.createTopology();
     }
 
@@ -79,12 +85,26 @@ public class ConstSpoutNullBoltTopo {
     public static void main(String[] args) throws Exception {
         int runTime = -1;
         Config topoConf = new Config();
+        // Configured for achieving max throughput in single worker mode 
(empirically found).
+        //  For reference : numbers taken on MacBook Pro mid 2015
+        //    -- ACKer=0:  ~8 mill/sec (batchSz=2k & recvQsize=50k).  6.7 
mill/sec (batchSz=1 & recvQsize=1k)
+        //    -- ACKer=1:  ~1 mill/sec,   lat= ~1 microsec  (batchSz=1 & 
bolt.wait.strategy=Park bolt.wait.park.micros=0)
+        //    -- ACKer=1:  ~1.3 mill/sec, lat= ~11 micros   (batchSz=1 & 
receive.buffer.size=1k, bolt.wait & bp.wait = Progressive[defaults])
+        //    -- ACKer=1:  ~1.6 mill/sec, lat= ~300 micros  (batchSz=500 & 
bolt.wait.strategy=Park bolt.wait.park.micros=0)
+        topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
+        topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 500);
+        topoConf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 50_000);
+        topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+        topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
+
         if (args.length > 0) {
             runTime = Integer.parseInt(args[0]);
         }
         if (args.length > 1) {
             topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
         }
+        topoConf.putAll(Utils.readCommandLineOpts());
+
         if (args.length > 2) {
             System.err.println("args: [runDurationSec]  [optionalConfFile]");
             return;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
index 94bd17f..5848cbc 100755
--- 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
+++ 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
@@ -37,7 +37,7 @@ public class ConstSpoutOnlyTopo {
     public static final String SPOUT_ID = "constSpout";
 
 
-    public static StormTopology getTopology() {
+    static StormTopology getTopology() {
 
         // 1 -  Setup Const Spout   --------
         ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
@@ -60,6 +60,10 @@ public class ConstSpoutOnlyTopo {
         if (args.length > 1) {
             topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
         }
+        topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
+        topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+        topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
+        topoConf.putAll(Utils.readCommandLineOpts());
         if (args.length > 2) {
             System.err.println("args: [runDurationSec]  [optionalConfFile]");
             return;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
index e64dd36..7e9256e 100644
--- 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
+++ 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
@@ -15,8 +15,8 @@
 * See the License for the specific language governing permissions and
 * limitations under the License
 */
-package org.apache.storm.perf;
 
+package org.apache.storm.perf;
 
 import java.util.Map;
 
@@ -36,15 +36,15 @@ import org.apache.storm.utils.Utils;
  */
 
 public class FileReadWordCountTopo {
-    public static final String SPOUT_ID =   "spout";
-    public static final String COUNT_ID =   "counter";
-    public static final String SPLIT_ID =   "splitter";
+    public static final String SPOUT_ID = "spout";
+    public static final String COUNT_ID = "counter";
+    public static final String SPLIT_ID = "splitter";
     public static final String TOPOLOGY_NAME = "FileReadWordCountTopo";
 
     // Config settings
-    public static final String SPOUT_NUM =  "spout.count";
-    public static final String SPLIT_NUM =  "splitter.count";
-    public static final String COUNT_NUM =  "counter.count";
+    public static final String SPOUT_NUM = "spout.count";
+    public static final String SPLIT_NUM = "splitter.count";
+    public static final String COUNT_NUM = "counter.count";
     public static final String INPUT_FILE = "input.file";
 
     public static final int DEFAULT_SPOUT_NUM = 1;
@@ -52,7 +52,7 @@ public class FileReadWordCountTopo {
     public static final int DEFAULT_COUNT_BOLT_NUM = 2;
 
 
-    public static StormTopology getTopology(Map<String, Object> config) {
+    static StormTopology getTopology(Map<String, Object> config) {
 
         final int spoutNum = Helper.getInt(config, SPOUT_NUM, 
DEFAULT_SPOUT_NUM);
         final int spBoltNum = Helper.getInt(config, SPLIT_NUM, 
DEFAULT_SPLIT_BOLT_NUM);
@@ -76,6 +76,13 @@ public class FileReadWordCountTopo {
         if (args.length > 1) {
             topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
         }
+        topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, 
"org.apache.storm.policy.WaitStrategyPark");
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
+        topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+        topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
+
+        topoConf.putAll(Utils.readCommandLineOpts());
         if (args.length > 2) {
             System.err.println("args: [runDurationSec]  [optionalConfFile]");
             return;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
index 2917587..e288a5c 100644
--- 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
+++ 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
@@ -18,6 +18,9 @@
 
 package org.apache.storm.perf;
 
+import java.util.Map;
+
+import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hdfs.spout.HdfsSpout;
 import org.apache.storm.hdfs.spout.TextFileReader;
@@ -26,8 +29,6 @@ import org.apache.storm.perf.utils.Helper;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 
-import java.util.Map;
-
 /***
  * This topo helps measure speed of reading from Hdfs.
  *  Spout Reads from Hdfs.
@@ -36,25 +37,21 @@ import java.util.Map;
 
 
 public class HdfsSpoutNullBoltTopo {
+    public static final int DEFAULT_SPOUT_NUM = 1;
+    public static final int DEFAULT_BOLT_NUM = 1;
     // names
     static final String TOPOLOGY_NAME = "HdfsSpoutNullBoltTopo";
     static final String SPOUT_ID = "hdfsSpout";
     static final String BOLT_ID = "devNullBolt";
-
     // configs
     static final String SPOUT_NUM = "spout.count";
     static final String BOLT_NUM = "bolt.count";
-
-    static final String HDFS_URI    = "hdfs.uri";
-    static final String SOURCE_DIR  = "hdfs.source.dir";
+    static final String HDFS_URI = "hdfs.uri";
+    static final String SOURCE_DIR = "hdfs.source.dir";
     static final String ARCHIVE_DIR = "hdfs.archive.dir";
-    static final String BAD_DIR     = "hdfs.bad.dir";
-
-    public static final int DEFAULT_SPOUT_NUM = 1;
-    public static final int DEFAULT_BOLT_NUM = 1;
-
+    static final String BAD_DIR = "hdfs.bad.dir";
 
-    public static StormTopology getTopology(Map<String, Object> config) {
+    static StormTopology getTopology(Map<String, Object> config) {
 
         final int spoutNum = Helper.getInt(config, SPOUT_NUM, 
DEFAULT_SPOUT_NUM);
         final int boltNum = Helper.getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
@@ -67,12 +64,12 @@ public class HdfsSpoutNullBoltTopo {
 
         // 1 -  Setup Hdfs Spout   --------
         HdfsSpout spout = new HdfsSpout()
-                .setReaderType(fileFormat)
-                .setHdfsUri(hdfsUri)
-                .setSourceDir(sourceDir)
-                .setArchiveDir(archiveDir)
-                .setBadFilesDir(badDir)
-                .withOutputFields(TextFileReader.defaultFields);
+            .setReaderType(fileFormat)
+            .setHdfsUri(hdfsUri)
+            .setSourceDir(sourceDir)
+            .setArchiveDir(archiveDir)
+            .setBadFilesDir(badDir)
+            .withOutputFields(TextFileReader.defaultFields);
 
         // 2 -   DevNull Bolt   --------
         DevNullBolt bolt = new DevNullBolt();
@@ -81,7 +78,7 @@ public class HdfsSpoutNullBoltTopo {
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout(SPOUT_ID, spout, spoutNum);
         builder.setBolt(BOLT_ID, bolt, boltNum)
-                .localOrShuffleGrouping(SPOUT_ID);
+            .localOrShuffleGrouping(SPOUT_ID);
 
         return builder.createTopology();
     }
@@ -92,9 +89,16 @@ public class HdfsSpoutNullBoltTopo {
             return;
         }
 
-        Integer durationSec = Integer.parseInt(args[0]);
-        Map<String, Object> topoConf = Utils.findAndReadConfigFile(args[1]);
+        final Integer durationSec = Integer.parseInt(args[0]);
+        Config topoConf = new Config();
+        topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+        topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, 
"org.apache.storm.policy.WaitStrategyPark");
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
+        topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+        topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
 
+        topoConf.putAll(Utils.readCommandLineOpts());
         // Submit to Storm cluster
         Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, 
topoConf, getTopology(topoConf));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
new file mode 100644
index 0000000..17c676f
--- /dev/null
+++ 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.storm.policy.WaitStrategyPark;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.MutableLong;
+
+public class JCQueuePerfTest {
+    // Usage: Let it and then explicitly terminate.
+    // Metrics will be printed when application is terminated.
+    public static void main(String[] args) throws Exception {
+//        oneProducer1Consumer(1000);  // -- measurement 1
+//        twoProducer1Consumer(1000);    // -- measurement 2
+//        threeProducer1Consumer(1);   // -- measurement 3
+
+//        oneProducer2Consumers();     // -- measurement 4
+
+//        producerFwdConsumer();      // -- measurement 5
+
+//        ackingProducerSimulation(); // -- measurement 6
+
+        while (true) {
+            Thread.sleep(1000);
+        }
+
+    }
+
+    private static void ackingProducerSimulation() {
+        WaitStrategyPark ws = new WaitStrategyPark(100);
+        JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws);
+        JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws);
+
+        final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
+        final Acker acker = new Acker(ackQ, spoutQ);
+
+        runAllThds(ackingProducer, acker);
+    }
+
+    private static void producerFwdConsumer(int prodBatchSz) {
+        WaitStrategyPark ws = new WaitStrategyPark(100);
+        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws);
+        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws);
+
+        final Producer prod = new Producer(q1);
+        final Forwarder fwd = new Forwarder(q1, q2);
+        final Consumer cons = new Consumer(q2);
+
+        runAllThds(prod, fwd, cons);
+    }
+
+
+    private static void oneProducer1Consumer(int prodBatchSz) {
+        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new 
WaitStrategyPark(100));
+
+        final Producer prod1 = new Producer(q1);
+        final Consumer cons1 = new Consumer(q1);
+
+        runAllThds(prod1, cons1);
+    }
+
+    private static void twoProducer1Consumer(int prodBatchSz) {
+        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new 
WaitStrategyPark(100));
+
+        final Producer prod1 = new Producer(q1);
+        final Producer prod2 = new Producer(q1);
+        final Consumer cons1 = new Consumer(q1);
+
+        runAllThds(prod1, prod2, cons1);
+    }
+
+    private static void threeProducer1Consumer(int prodBatchSz) {
+        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new 
WaitStrategyPark(100));
+
+        final Producer prod1 = new Producer(q1);
+        final Producer prod2 = new Producer(q1);
+        final Producer prod3 = new Producer(q1);
+        final Consumer cons1 = new Consumer(q1);
+
+        runAllThds(prod1, prod2, prod3, cons1);
+    }
+
+
+    private static void oneProducer2Consumers(int prodBatchSz) {
+        WaitStrategyPark ws = new WaitStrategyPark(100);
+        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws);
+        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws);
+
+        final Producer2 prod1 = new Producer2(q1, q2);
+        final Consumer cons1 = new Consumer(q1);
+        final Consumer cons2 = new Consumer(q2);
+
+        runAllThds(prod1, cons1, cons2);
+    }
+
+    public static void runAllThds(MyThread... threads) {
+        for (Thread thread : threads) {
+            thread.start();
+        }
+        addShutdownHooks(threads);
+    }
+
+    public static void addShutdownHooks(MyThread... threads) {
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+                System.err.println("Stopping");
+                for (Thread thread : threads) {
+                    thread.interrupt();
+                }
+
+                for (Thread thread : threads) {
+                    System.err.println("Waiting for " + thread.getName());
+                    thread.join();
+                }
+
+                for (MyThread thread : threads) {
+                    System.err.printf("%s : %d,  Throughput: %,d \n", 
thread.getName(), thread.count, thread.throughput());
+                }
+            } catch (InterruptedException e) {
+                return;
+            }
+        }));
+
+    }
+
+}
+
+
+abstract class MyThread extends Thread {
+    public long count = 0;
+    public long runTime = 0;
+
+    public MyThread(String thdName) {
+        super(thdName);
+    }
+
+    public long throughput() {
+        return getCount() / (runTime / 1000);
+    }
+
+    public long getCount() {
+        return count;
+    }
+}
+
+class Producer extends MyThread {
+    private final JCQueue q;
+
+    public Producer(JCQueue q) {
+        super("Producer");
+        this.q = q;
+    }
+
+    @Override
+    public void run() {
+        try {
+            long start = System.currentTimeMillis();
+            while (!Thread.interrupted()) {
+                q.publish(++count);
+            }
+            runTime = System.currentTimeMillis() - start;
+        } catch (InterruptedException e) {
+            return;
+        }
+    }
+
+}
+
+// writes to two queues
+class Producer2 extends MyThread {
+    private final JCQueue q1;
+    private final JCQueue q2;
+
+    public Producer2(JCQueue q1, JCQueue q2) {
+        super("Producer2");
+        this.q1 = q1;
+        this.q2 = q2;
+    }
+
+    @Override
+    public void run() {
+        try {
+            long start = System.currentTimeMillis();
+            while (!Thread.interrupted()) {
+                q1.publish(++count);
+                q2.publish(count);
+            }
+            runTime = System.currentTimeMillis() - start;
+        } catch (InterruptedException e) {
+            return;
+        }
+
+    }
+}
+
+
+// writes to two queues
+class AckingProducer extends MyThread {
+    private final JCQueue ackerInQ;
+    private final JCQueue spoutInQ;
+
+    public AckingProducer(JCQueue ackerInQ, JCQueue spoutInQ) {
+        super("AckingProducer");
+        this.ackerInQ = ackerInQ;
+        this.spoutInQ = spoutInQ;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Handler handler = new Handler();
+            long start = System.currentTimeMillis();
+            while (!Thread.interrupted()) {
+                int x = spoutInQ.consume(handler);
+                ackerInQ.publish(count);
+            }
+            runTime = System.currentTimeMillis() - start;
+        } catch (InterruptedException e) {
+            return;
+        }
+    }
+
+    private class Handler implements JCQueue.Consumer {
+        @Override
+        public void accept(Object event) {
+            // no-op
+        }
+
+        @Override
+        public void flush() {
+            // no-op
+        }
+    }
+}
+
+// reads from ackerInQ and writes to spout queue
+class Acker extends MyThread {
+    private final JCQueue ackerInQ;
+    private final JCQueue spoutInQ;
+
+    public Acker(JCQueue ackerInQ, JCQueue spoutInQ) {
+        super("Acker");
+        this.ackerInQ = ackerInQ;
+        this.spoutInQ = spoutInQ;
+    }
+
+
+    @Override
+    public void run() {
+        long start = System.currentTimeMillis();
+        Handler handler = new Handler();
+        while (!Thread.interrupted()) {
+            ackerInQ.consume(handler);
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+
+    private class Handler implements JCQueue.Consumer {
+        @Override
+        public void accept(Object event) {
+            try {
+                spoutInQ.publish(event);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void flush() throws InterruptedException {
+            spoutInQ.flush();
+        }
+    }
+}
+
+class Consumer extends MyThread {
+    public final MutableLong counter = new MutableLong(0);
+    private final JCQueue q;
+
+    public Consumer(JCQueue q) {
+        super("Consumer");
+        this.q = q;
+    }
+
+    @Override
+    public void run() {
+        Handler handler = new Handler();
+        long start = System.currentTimeMillis();
+        while (!Thread.interrupted()) {
+            int x = q.consume(handler);
+            if (x == 0) {
+                LockSupport.parkNanos(1);
+            }
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+
+    @Override
+    public long getCount() {
+        return counter.get();
+    }
+
+    private class Handler implements JCQueue.Consumer {
+        @Override
+        public void accept(Object event) {
+            counter.increment();
+        }
+
+        @Override
+        public void flush() {
+            // no-op
+        }
+    }
+}
+
+
+class Forwarder extends MyThread {
+    public final MutableLong counter = new MutableLong(0);
+    private final JCQueue inq;
+    private final JCQueue outq;
+
+    public Forwarder(JCQueue inq, JCQueue outq) {
+        super("Forwarder");
+        this.inq = inq;
+        this.outq = outq;
+    }
+
+    @Override
+    public void run() {
+        Handler handler = new Handler();
+        long start = System.currentTimeMillis();
+        while (!Thread.interrupted()) {
+            int x = inq.consume(handler);
+            if (x == 0) {
+                LockSupport.parkNanos(1);
+            }
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+
+    @Override
+    public long getCount() {
+        return counter.get();
+    }
+
+    private class Handler implements JCQueue.Consumer {
+        @Override
+        public void accept(Object event) {
+            try {
+                outq.publish(event);
+                counter.increment();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void flush() {
+            // no-op
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java
new file mode 100644
index 0000000..98525ed
--- /dev/null
+++ 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.storm.utils.MutableLong;
+import org.jctools.queues.MpscArrayQueue;
+
+public class JCToolsPerfTest {
+    public static void main(String[] args) throws Exception {
+//        oneProducer1Consumer();
+//        twoProducer1Consumer();
+//        threeProducer1Consumer();
+//        oneProducer2Consumers();
+//        producerFwdConsumer();
+
+//        JCQueue spoutQ = new JCQueue("spoutQ", 1024, 100, 0);
+//        JCQueue ackQ = new JCQueue("ackQ", 1024, 100, 0);
+//
+//        final AckingProducer ackingProducer = new AckingProducer(spoutQ, 
ackQ);
+//        final Acker acker = new Acker(ackQ, spoutQ);
+//
+//        runAllThds(ackingProducer, acker);
+
+        while (true) {
+            Thread.sleep(1000);
+        }
+
+    }
+
+    private static void oneProducer1Consumer() {
+        MpscArrayQueue<Object> q1 = new MpscArrayQueue<Object>(50_000);
+
+        final Prod prod1 = new Prod(q1);
+        final Cons cons1 = new Cons(q1);
+
+        runAllThds(prod1, cons1);
+    }
+
+    private static void twoProducer1Consumer() {
+        MpscArrayQueue<Object> q1 = new MpscArrayQueue<Object>(50_000);
+
+        final Prod prod1 = new Prod(q1);
+        final Prod prod2 = new Prod(q1);
+        final Cons cons1 = new Cons(q1);
+
+        runAllThds(prod1, cons1, prod2);
+    }
+
+    private static void threeProducer1Consumer() {
+        MpscArrayQueue<Object> q1 = new MpscArrayQueue<Object>(50_000);
+
+        final Prod prod1 = new Prod(q1);
+        final Prod prod2 = new Prod(q1);
+        final Prod prod3 = new Prod(q1);
+        final Cons cons1 = new Cons(q1);
+
+        runAllThds(prod1, prod2, prod3, cons1);
+    }
+
+
+    private static void oneProducer2Consumers() {
+        MpscArrayQueue<Object> q1 = new MpscArrayQueue<Object>(50_000);
+        MpscArrayQueue<Object> q2 = new MpscArrayQueue<Object>(50_000);
+
+        final Prod2 prod1 = new Prod2(q1, q2);
+        final Cons cons1 = new Cons(q1);
+        final Cons cons2 = new Cons(q2);
+
+        runAllThds(prod1, cons1, cons2);
+    }
+
+    public static void runAllThds(MyThd... threads) {
+        for (Thread thread : threads) {
+            thread.start();
+        }
+        addShutdownHooks(threads);
+    }
+
+    public static void addShutdownHooks(MyThd... threads) {
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+                System.err.println("Stopping");
+                for (MyThd thread : threads) {
+                    thread.halt = true;
+                }
+
+                for (Thread thread : threads) {
+                    System.err.println("Waiting for " + thread.getName());
+                    thread.join();
+                }
+
+                for (MyThd thread : threads) {
+                    System.err.printf("%s : %d,  Throughput: %,d \n", 
thread.getName(), thread.count, thread.throughput());
+                }
+            } catch (InterruptedException e) {
+                return;
+            }
+        }));
+
+    }
+
+}
+
+
+abstract class MyThd extends Thread {
+    public long count = 0;
+    public long runTime = 0;
+    public boolean halt = false;
+
+    public MyThd(String thdName) {
+        super(thdName);
+    }
+
+    public long throughput() {
+        return getCount() / (runTime / 1000);
+    }
+
+    public long getCount() {
+        return count;
+    }
+}
+
+class Prod extends MyThd {
+    private final MpscArrayQueue<Object> q;
+
+    public Prod(MpscArrayQueue<Object> q) {
+        super("Producer");
+        this.q = q;
+    }
+
+    @Override
+    public void run() {
+        long start = System.currentTimeMillis();
+
+        while (!halt) {
+            ++count;
+            while (!q.offer(count)) {
+                if (Thread.interrupted()) {
+                    return;
+                }
+            }
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+
+}
+
+// writes to two queues
+class Prod2 extends MyThd {
+    private final MpscArrayQueue<Object> q1;
+    private final MpscArrayQueue<Object> q2;
+
+    public Prod2(MpscArrayQueue<Object> q1, MpscArrayQueue<Object> q2) {
+        super("Producer2");
+        this.q1 = q1;
+        this.q2 = q2;
+    }
+
+    @Override
+    public void run() {
+        long start = System.currentTimeMillis();
+
+        while (!halt) {
+            q1.offer(++count);
+            q2.offer(count);
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+}
+
+
+class Cons extends MyThd {
+    public final MutableLong counter = new MutableLong(0);
+    private final MpscArrayQueue<Object> q;
+
+    public Cons(MpscArrayQueue<Object> q) {
+        super("Consumer");
+        this.q = q;
+    }
+
+    @Override
+    public void run() {
+        Handler handler = new Handler();
+        long start = System.currentTimeMillis();
+
+        while (!halt) {
+            int x = q.drain(handler);
+            if (x == 0) {
+                LockSupport.parkNanos(1);
+            } else {
+                counter.increment();
+            }
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+
+    @Override
+    public long getCount() {
+        return counter.get();
+    }
+
+    private class Handler implements 
org.jctools.queues.MessagePassingQueue.Consumer<Object> {
+        @Override
+        public void accept(Object event) {
+            counter.increment();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
index d2ed691..bdd35b6 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
@@ -18,6 +18,10 @@
 
 package org.apache.storm.perf;
 
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hdfs.bolt.HdfsBolt;
 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
@@ -35,11 +39,8 @@ import org.apache.storm.kafka.ZkHosts;
 import org.apache.storm.perf.utils.Helper;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
-
-import java.util.Map;
-import java.util.UUID;
+import org.apache.storm.utils.Utils;
 
 /***
  * This topo helps measure speed of reading from Kafka and writing to Hdfs.
@@ -49,84 +50,85 @@ import java.util.UUID;
 
 public class KafkaHdfsTopo {
 
-  // configs - topo parallelism
-  public static final String SPOUT_NUM = "spout.count";
-  public static final String BOLT_NUM = "bolt.count";
-  // configs - kafka spout
-  public static final String KAFKA_TOPIC = "kafka.topic";
-  public static final String ZOOKEEPER_URI = "zk.uri";
-  // configs - hdfs bolt
-  public static final String HDFS_URI = "hdfs.uri";
-  public static final String HDFS_PATH = "hdfs.dir";
-  public static final String HDFS_BATCH = "hdfs.batch";
-
+    // configs - topo parallelism
+    public static final String SPOUT_NUM = "spout.count";
+    public static final String BOLT_NUM = "bolt.count";
+    // configs - kafka spout
+    public static final String KAFKA_TOPIC = "kafka.topic";
+    public static final String ZOOKEEPER_URI = "zk.uri";
+    // configs - hdfs bolt
+    public static final String HDFS_URI = "hdfs.uri";
+    public static final String HDFS_PATH = "hdfs.dir";
+    public static final String HDFS_BATCH = "hdfs.batch";
 
-  public static final int DEFAULT_SPOUT_NUM = 1;
-  public static final int DEFAULT_BOLT_NUM = 1;
-  public static final int DEFAULT_HDFS_BATCH = 1000;
 
-  // names
-  public static final String TOPOLOGY_NAME = "KafkaHdfsTopo";
-  public static final String SPOUT_ID = "kafkaSpout";
-  public static final String BOLT_ID = "hdfsBolt";
+    public static final int DEFAULT_SPOUT_NUM = 1;
+    public static final int DEFAULT_BOLT_NUM = 1;
+    public static final int DEFAULT_HDFS_BATCH = 1000;
 
+    // names
+    public static final String TOPOLOGY_NAME = "KafkaHdfsTopo";
+    public static final String SPOUT_ID = "kafkaSpout";
+    public static final String BOLT_ID = "hdfsBolt";
 
 
-  public static StormTopology getTopology(Map<String, Object> config) {
+    static StormTopology getTopology(Map<String, Object> config) {
 
-    final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
-    final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
+        final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+        final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
 
-    final int hdfsBatch = getInt(config, HDFS_BATCH, DEFAULT_HDFS_BATCH);
+        final int hdfsBatch = getInt(config, HDFS_BATCH, DEFAULT_HDFS_BATCH);
 
-    // 1 -  Setup Kafka Spout   --------
-    String zkConnString = getStr(config, ZOOKEEPER_URI);
-    String topicName = getStr(config, KAFKA_TOPIC);
+        // 1 -  Setup Kafka Spout   --------
+        String zkConnString = getStr(config, ZOOKEEPER_URI);
+        String topicName = getStr(config, KAFKA_TOPIC);
 
-    BrokerHosts brokerHosts = new ZkHosts(zkConnString);
-    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + 
topicName, UUID.randomUUID().toString());
-    spoutConfig.scheme = new StringMultiSchemeWithTopic();
-    spoutConfig.ignoreZkOffsets = true;
+        BrokerHosts brokerHosts = new ZkHosts(zkConnString);
+        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" 
+ topicName, UUID.randomUUID().toString());
+        spoutConfig.scheme = new StringMultiSchemeWithTopic();
+        spoutConfig.ignoreZkOffsets = true;
 
-    KafkaSpout spout = new KafkaSpout(spoutConfig);
+        KafkaSpout spout = new KafkaSpout(spoutConfig);
 
-    // 2 -  Setup HFS Bolt   --------
-    String Hdfs_url = getStr(config, HDFS_URI);
-    RecordFormat format = new LineWriter("str");
-    SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
-    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, 
FileSizeRotationPolicy.Units.GB);
+        // 2 -  Setup HFS Bolt   --------
+        String hdfsUrls = getStr(config, HDFS_URI);
+        RecordFormat format = new LineWriter("str");
+        SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
+        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, 
FileSizeRotationPolicy.Units.GB);
 
-    FileNameFormat fileNameFormat = new 
DefaultFileNameFormat().withPath(getStr(config,HDFS_PATH) );
+        FileNameFormat fileNameFormat = new 
DefaultFileNameFormat().withPath(getStr(config, HDFS_PATH));
 
-    // Instantiate the HdfsBolt
-    HdfsBolt bolt = new HdfsBolt()
-            .withFsUrl(Hdfs_url)
+        // Instantiate the HdfsBolt
+        HdfsBolt bolt = new HdfsBolt()
+            .withFsUrl(hdfsUrls)
             .withFileNameFormat(fileNameFormat)
             .withRecordFormat(format)
             .withRotationPolicy(rotationPolicy)
             .withSyncPolicy(syncPolicy);
 
 
-    // 3 - Setup Topology  --------
-    TopologyBuilder builder = new TopologyBuilder();
-    builder.setSpout(SPOUT_ID, spout, spoutNum);
-    builder.setBolt(BOLT_ID, bolt, boltNum)
+        // 3 - Setup Topology  --------
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(SPOUT_ID, spout, spoutNum);
+        builder.setBolt(BOLT_ID, bolt, boltNum)
             .localOrShuffleGrouping(SPOUT_ID);
 
-    return builder.createTopology();
-  }
+        return builder.createTopology();
+    }
 
 
-  public static int getInt(Map map, Object key, int def) {
-    return ObjectReader.getInt(Utils.get(map, key, def));
-  }
+    public static int getInt(Map map, Object key, int def) {
+        return ObjectReader.getInt(Utils.get(map, key, def));
+    }
 
-  public static String getStr(Map map, Object key) {
-    return (String) map.get(key);
-  }
+    public static String getStr(Map map, Object key) {
+        return (String) map.get(key);
+    }
 
 
-    /** Copies text file content from sourceDir to destinationDir. Moves 
source files into sourceDir after its done consuming */
+    /**
+     * Copies text file content from sourceDir to destinationDir. Moves source 
files into sourceDir after its done consuming
+     */
     public static void main(String[] args) throws Exception {
 
         if (args.length != 2) {
@@ -137,7 +139,13 @@ public class KafkaHdfsTopo {
         Integer durationSec = Integer.parseInt(args[0]);
         String confFile = args[1];
         Map<String, Object> topoConf = Utils.findAndReadConfigFile(confFile);
+        topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
+        topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+        topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, 
"org.apache.storm.policy.WaitStrategyPark");
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
 
+        topoConf.putAll(Utils.readCommandLineOpts());
         //  Submit topology to Storm cluster
         Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, 
topoConf, getTopology(topoConf));
     }
@@ -156,14 +164,14 @@ public class KafkaHdfsTopo {
          * @param delimiter
          * @return
          */
-        public LineWriter withLineDelimiter(String delimiter){
+        public LineWriter withLineDelimiter(String delimiter) {
             this.lineDelimiter = delimiter;
             return this;
         }
 
         @Override
         public byte[] format(Tuple tuple) {
-            return (tuple.getValueByField(fieldName).toString() +  
this.lineDelimiter).getBytes();
+            return (tuple.getValueByField(fieldName).toString() + 
this.lineDelimiter).getBytes();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
index 321ab78..d31bb4a 100755
--- 
a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
+++ 
b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
@@ -18,6 +18,10 @@
 
 package org.apache.storm.perf;
 
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.kafka.BrokerHosts;
 import org.apache.storm.kafka.KafkaSpout;
@@ -27,19 +31,14 @@ import org.apache.storm.kafka.ZkHosts;
 import org.apache.storm.perf.bolt.DevNullBolt;
 import org.apache.storm.perf.utils.Helper;
 import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
-
-import java.util.Map;
-import java.util.UUID;
-
+import org.apache.storm.utils.Utils;
 
 /***
  * This topo helps measure speed of reading from Kafka
  *   Spout Reads from Kafka.
  *   Bolt acks and discards tuples
  */
-
 public class KafkaSpoutNullBoltTopo {
 
     // configs - topo parallelism
@@ -60,7 +59,7 @@ public class KafkaSpoutNullBoltTopo {
     public static final String BOLT_ID = "devNullBolt";
 
 
-    public static StormTopology getTopology(Map<String, Object> config) {
+    static StormTopology getTopology(Map<String, Object> config) {
 
         final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
         final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
@@ -83,7 +82,7 @@ public class KafkaSpoutNullBoltTopo {
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout(SPOUT_ID, spout, spoutNum);
         builder.setBolt(BOLT_ID, bolt, boltNum)
-                .localOrShuffleGrouping(SPOUT_ID);
+            .localOrShuffleGrouping(SPOUT_ID);
 
         return builder.createTopology();
     }
@@ -102,13 +101,19 @@ public class KafkaSpoutNullBoltTopo {
      * Copies text file content from sourceDir to destinationDir. Moves source 
files into sourceDir after its done consuming
      */
     public static void main(String[] args) throws Exception {
-        if (args.length !=2) {
+        if (args.length != 2) {
             System.err.println("args: runDurationSec confFile");
             return;
         }
         Integer durationSec = Integer.parseInt(args[0]);
         Map<String, Object> topoConf = Utils.findAndReadConfigFile(args[1]);
+        topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
+        topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
+        topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, 
"org.apache.storm.policy.WaitStrategyPark");
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
 
+        topoConf.putAll(Utils.readCommandLineOpts());
         //  Submit to Storm cluster
         Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, 
topoConf, getTopology(topoConf));
     }

Reply via email to