[ 
https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710090#comment-14710090
 ] 

ASF GitHub Bot commented on STORM-855:
--------------------------------------

Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/694#discussion_r37807094
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj ---
    @@ -658,40 +688,42 @@
                               
                               ;;(log-debug "Received tuple " tuple " at task " 
task-id)
                               ;; need to do it this way to avoid reflection
    -                          (let [stream-id (.getSourceStreamId tuple)]
    -                            (condp = stream-id
    -                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
    -                                (let [task-data (get task-datas task-id)
    -                                      bolt-obj (:object task-data)]
    -                                  (when (instance? ICredentialsListener 
bolt-obj)
    -                                    (.setCredentials bolt-obj (.getValue 
tuple 0))))
    -                              Constants/METRICS_TICK_STREAM_ID 
(metrics-tick executor-data (get task-datas task-id) tuple)
    -                              (let [task-data (get task-datas task-id)
    -                                    ^IBolt bolt-obj (:object task-data)
    -                                    user-context (:user-context task-data)
    -                                    sampler? (sampler)
    -                                    execute-sampler? (execute-sampler)
    -                                    now (if (or sampler? execute-sampler?) 
(System/currentTimeMillis))]
    -                                (when sampler?
    -                                  (.setProcessSampleStartTime tuple now))
    -                                (when execute-sampler?
    -                                  (.setExecuteSampleStartTime tuple now))
    -                                (.execute bolt-obj tuple)
    -                                (let [delta (tuple-execute-time-delta! 
tuple)]
    -                                  (when (= true (storm-conf 
TOPOLOGY-DEBUG))
    -                                    (log-message "Execute done TUPLE " 
tuple " TASK: " task-id " DELTA: " delta))
    - 
    -                                  (task/apply-hooks user-context 
.boltExecute (BoltExecuteInfo. tuple task-id delta))
    -                                  (when delta
    -                                    (builtin-metrics/bolt-execute-tuple! 
(:builtin-metrics task-data)
    -                                                                         
executor-stats
    -                                                                         
(.getSourceComponent tuple)                                                     
 
    -                                                                         
(.getSourceStreamId tuple)
    -                                                                         
delta)
    -                                    (stats/bolt-execute-tuple! 
executor-stats
    -                                                               
(.getSourceComponent tuple)
    -                                                               
(.getSourceStreamId tuple)
    -                                                               delta)))))))
    +                          (let [tuples (if (instance? TupleImpl 
tupleOrBatch) (list tupleOrBatch) (.buffer tupleOrBatch))]
    +                            (doseq [tuple tuples]
    +                              (let [stream-id (.getSourceStreamId tuple)]
    --- End diff --
    
    This line, for instance, is eating 9% of the CPU time.
    <img width="1296" alt="screen shot 2015-08-24 at 4 26 07 pm" 
src="https://cloud.githubusercontent.com/assets/1819836/9452988/143be13e-4a7d-11e5-812f-6c2eefd2b96e.png";>



> Add tuple batching
> ------------------
>
>                 Key: STORM-855
>                 URL: https://issues.apache.org/jira/browse/STORM-855
>             Project: Apache Storm
>          Issue Type: New Feature
>            Reporter: Matthias J. Sax
>            Assignee: Matthias J. Sax
>            Priority: Minor
>
> In order to increase Storm's throughput, multiple tuples can be grouped 
> together in a batch of tuples (ie, fat-tuple) and transfered from producer to 
> consumer at once.
> The initial idea is taken from https://github.com/mjsax/aeolus. However, we 
> aim to integrate this feature deep into the system (in contrast to building 
> it on top), what has multiple advantages:
>   - batching can be even more transparent to the user (eg, no extra 
> direct-streams needed to mimic Storm's data distribution patterns)
>   - fault-tolerance (anchoring/acking) can be done on a tuple granularity 
> (not on a batch granularity, what leads to much more replayed tuples -- and 
> result duplicates -- in case of failure)
> The aim is to extend TopologyBuilder interface with an additional parameter 
> 'batch_size' to expose this feature to the user. Per default, batching will 
> be disabled.
> This batching feature has pure tuple transport purpose, ie, tuple-by-tuple 
> processing semantics are preserved. An output batch is assembled at the 
> producer and completely disassembled at the consumer. The consumer output can 
> be batched again, however, independent of batched or non-batched input. Thus, 
> batches can be of different size for each producer-consumer pair. 
> Furthermore, consumers can receive batches of different size from different 
> producers (including regular non batched input).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to