[
https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710451#comment-14710451
]
ASF GitHub Bot commented on STORM-855:
--------------------------------------
Github user mjsax commented on the pull request:
https://github.com/apache/storm/pull/694#issuecomment-134450197
I just pushed some changes (I added a new commit, so you can better see
what I changed):
- added type hints
- split tuple and batch serialization in separate classes
- assemble different "emit function" in Clojure for single tuple and batch
case (to add more type hints)
I get the following result running on a 4 node cluster with parameters:
`storm jar storm_perf_test-1.0.0-SNAPSHOT-jar-with-dependencies.jar
com.yahoo.storm.perftest.Main --bolt 3 --name test -l 1 -n 1 --messageSize 4
--workers 4 --spout 1 --testTimeSec 300`
Master Branch:
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 48 0 4 0 1440466170638 0 0
0.0
WAITING 1 48 4 4 4 1440466200638 30000
6122840 0.7785593668619791
WAITING 1 48 4 4 4 1440466230638 30000
11565400 1.4706166585286458
RUNNING 1 48 4 4 4 1440466260638 30000
11394040 1.4488271077473958
RUNNING 1 48 4 4 4 1440466290638 30000
11718240 1.49005126953125
RUNNING 1 48 4 4 4 1440466320638 30000
11615920 1.4770406087239583
RUNNING 1 48 4 4 4 1440466350638 30000
11557380 1.4695968627929688
RUNNING 1 48 4 4 4 1440466380638 30000
11581080 1.4726104736328125
RUNNING 1 48 4 4 4 1440466410638 30000
11492600 1.4613596598307292
RUNNING 1 48 4 4 4 1440466440638 30000
11413760 1.4513346354166667
RUNNING 1 48 4 4 4 1440466470638 30000
11300580 1.4369430541992188
RUNNING 1 48 4 4 4 1440466500638 30000
11368760 1.4456125895182292
RUNNING 1 48 4 4 4 1440466530638 30000
11509820 1.463549296061198
```
Batching branch with batching disabled:
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 48 0 4 0 1440467016767 0 0
0.0
WAITING 1 48 4 4 4 1440467046767 30000
7095940 0.9022954305013021
WAITING 1 48 4 4 4 1440467076767 30000
11136640 1.4160970052083333
RUNNING 1 48 4 4 4 1440467106767 30000
11159220 1.4189682006835938
RUNNING 1 48 4 4 4 1440467136767 30000
7757660 0.9864374796549479
RUNNING 1 48 4 4 4 1440467166767 30000
11375580 1.4464797973632812
RUNNING 1 48 4 4 4 1440467196767 30000
11669980 1.4839146931966145
RUNNING 1 48 4 4 4 1440467226767 30000
11344380 1.4425125122070312
RUNNING 1 48 4 4 4 1440467256767 30000
11521460 1.4650293986002605
RUNNING 1 48 4 4 4 1440467286767 30000
11401040 1.4497172037760417
RUNNING 1 48 4 4 4 1440467316767 30000
11493700 1.461499532063802
RUNNING 1 48 4 4 4 1440467346767 30000
11452680 1.4562835693359375
RUNNING 1 48 4 4 4 1440467376767 30000
11148300 1.4175796508789062
```
Batching branch with batch size of 100 tuples:
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 48 1 4 0 1440467461710 0 0
0.0
WAITING 1 48 4 4 4 1440467491710 30000
11686000 1.4859517415364583
WAITING 1 48 4 4 4 1440467521710 30000
18026640 2.292205810546875
RUNNING 1 48 4 4 4 1440467551710 30000
17936300 2.2807184855143228
RUNNING 1 48 4 4 4 1440467581710 30000
18969300 2.4120712280273438
RUNNING 1 48 4 4 4 1440467611710 30000
18581620 2.3627751668294272
RUNNING 1 48 4 4 4 1440467641711 30001
18963120 2.4112050268897285
RUNNING 1 48 4 4 4 1440467671710 29999
18607200 2.3661067022546587
RUNNING 1 48 4 4 4 1440467701710 30000
19333620 2.4583969116210938
RUNNING 1 48 4 4 4 1440467731710 30000
18629100 2.3688125610351562
RUNNING 1 48 4 4 4 1440467761711 30001
18847820 2.3965443624209923
RUNNING 1 48 4 4 4 1440467791710 29999
18021400 2.291615897287722
RUNNING 1 48 4 4 4 1440467821710 30000
18143360 2.3070475260416665
```
The negative impact is gone and batching increases output rate by about
50%. Need to do more tests. Also need to investigate the performance impact of
input debachting. Furthermore, need to test with acking enabled.
Some more question:
- What about `assert-can-serialize`? Is it performance critical? Did not
test it, but it seems that a generic approach for tuple and batch should be
good enough.
- I also do not understand the following code (line 648-622 in
`executor.clj` in batching branch). I used batching, but did not modify this
code. Nevertheless it works (I guess this part is not executed?). Can you
explain?
```
(task/send-unanchored task-data
ACKER-INIT-STREAM-ID
[root-id (bit-xor-vals out-ids)
task-id]
overflow-buffer)
```
- What about batching `acks`? Would it make sense? I don't understand the
acking code path good enough right now to judge. As acking is quite expensive,
it might be a good idea.
> Add tuple batching
> ------------------
>
> Key: STORM-855
> URL: https://issues.apache.org/jira/browse/STORM-855
> Project: Apache Storm
> Issue Type: New Feature
> Reporter: Matthias J. Sax
> Assignee: Matthias J. Sax
> Priority: Minor
>
> In order to increase Storm's throughput, multiple tuples can be grouped
> together in a batch of tuples (ie, fat-tuple) and transfered from producer to
> consumer at once.
> The initial idea is taken from https://github.com/mjsax/aeolus. However, we
> aim to integrate this feature deep into the system (in contrast to building
> it on top), what has multiple advantages:
> - batching can be even more transparent to the user (eg, no extra
> direct-streams needed to mimic Storm's data distribution patterns)
> - fault-tolerance (anchoring/acking) can be done on a tuple granularity
> (not on a batch granularity, what leads to much more replayed tuples -- and
> result duplicates -- in case of failure)
> The aim is to extend TopologyBuilder interface with an additional parameter
> 'batch_size' to expose this feature to the user. Per default, batching will
> be disabled.
> This batching feature has pure tuple transport purpose, ie, tuple-by-tuple
> processing semantics are preserved. An output batch is assembled at the
> producer and completely disassembled at the consumer. The consumer output can
> be batched again, however, independent of batched or non-batched input. Thus,
> batches can be of different size for each producer-consumer pair.
> Furthermore, consumers can receive batches of different size from different
> producers (including regular non batched input).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)