[
https://issues.apache.org/jira/browse/METRON-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853589#comment-15853589
]
ASF GitHub Bot commented on METRON-322:
---------------------------------------
GitHub user mattf-horton opened a pull request:
https://github.com/apache/incubator-metron/pull/442
METRON-322 Global Batching and Flushing
DO NOT INTEGRATE YET. This is a preliminary review request.
This Jira Ticket is to add timeout flushing to all Writers that do batch
writes. Currently most of them will wait indefinitely for a batch to fill,
resulting in message recycling due to `topology.message.timeout.secs`. This
changeset so far implements timeout flushing for the BulkMessageWriterBolt.
I am now starting to work on unit tests, so while this code compiles and
passes existing unit tests, the new functionality has not yet been tested.
However, I would be grateful for a quick review of the basic approach, focusing
on changes in:
* BulkMessageWriterBolt
* BulkWriterComponent
* BatchTimeoutHelper (new), and
* IndexingConfigurations.
All of the other changes are basically boilerplate, just laying
"getBatchTimeout()" facade methods alongside existing "getBatchSize()" facade
methods. (There's a lot of layers!)
The flush-on-timeout logic is fairly straightforward. It was implemented
by a refactoring of BulkWriterComponent and basic Tick Tuple reception in
BulkMessageWriterBolt.
The tricky part was figuring out the appropriate setting for
'topology.tick.tuple.freq.secs' if the administrator configures non-default
batchTimeouts. It is necessary to enumerate the batchTimeout settings for all
configured sensorNames, which is implemented in
IndexingConfigurations$getAllConfiguredTimeouts(). Then multiple other factors
must be taken into account to determine the allowed and recommended settings,
which is implemented in BatchTimeoutHelper. If there are better ways to
accomplish these things, please share your ideas.
After feedback is incorporated, I need to make similar changes to the
ParserWriter, and possibly other places in the code where batching is used and
timeouts are not yet implemented. That is separable work, which I'm aware also
needs to be done. Thanks for taking time to look at this.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/mattf-horton/incubator-metron METRON-322
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-metron/pull/442.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #442
----
commit 6fdf67c962a4bcfeb43da9c1aa1318ace6b194c3
Author: mattf-horton <[email protected]>
Date: 2017-02-02T00:54:17Z
METRON-322 The first easy changes: add batchTimeout most places batchSize
is currently used.
commit b3eeb3dbbc0834c4c9f3d2fc24d099f2f8e5a3ea
Author: mattf-horton <[email protected]>
Date: 2017-02-02T08:57:41Z
mods for flush and timeout in BulkWriterComponent. Some work in
BulkMessageWriterBolt, but no getComponentConfiguration() yet.
commit 666f14a0ffe0eefc7de12e226d6953fd3e32d2d5
Author: mattf-horton <[email protected]>
Date: 2017-02-06T06:06:07Z
full implementation of getComponentConfiguration() method in
BulkMessageWriterBolt, with implementation details in IndexingConfigurations
and new BatchTimeoutHelper class.
----
> Global Batching and Flushing
> ----------------------------
>
> Key: METRON-322
> URL: https://issues.apache.org/jira/browse/METRON-322
> Project: Metron
> Issue Type: Improvement
> Reporter: Ajay Yadav
> Assignee: Matt Foley
>
> All Writers and other bolts that maintain an internal "batch" queue, need to
> have a timeout flush, to prevent messages from low-volume telemetries from
> sitting in their queues indefinitely. Storm has a timeout value
> (topology.message.timeout.secs) that prevents it from waiting for too long.
> If the Writer does not process the queue before the timeout, then Storm
> recycles the tuples through the topology. This has multiple undesirable
> consequences, including data duplication and waste of compute resources. We
> would like to be able to specify an interval after which the queues would
> flush, even if the batch size is not met.
> We will utilize the Storm Tick Tuple to trigger timeout flushing, following
> the recommendations of the article at
> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/#CONCLUSION
> Since every Writer processes its queue somewhat differently, every bolt that
> has a "batchSize" parameter will be given a "batchTimeout" parameter too. It
> will default to 1/2 the value of "topology.message.timeout.secs", as
> recommended, and will ignore settings larger than the default, which could
> cause failure to flush in time. In the Enrichment topology, where two
> Writers may be placed one after the other (enrichment and threat intel), the
> default timeout interval will be 1/4 the value of
> "topology.message.timeout.secs". The default value of
> "topology.message.timeout.secs" in Storm is 30 seconds.
> In addition, Storm provides a limit on the number of pending messages that
> have not been acked. If more than "topology.max.spout.pending" messages are
> waiting in a topology, then Storm will recycle them through the topology.
> However, the default value of "topology.max.spout.pending" is null, and if
> set to non-null value, the user can manage the consequences by setting
> batchSize limits appropriately. Having the timeout flush will also
> ameliorate this issue. So we do not need to address
> "topology.max.spout.pending" directly in this task.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)