[
https://issues.apache.org/jira/browse/METRON-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15939218#comment-15939218
]
ASF GitHub Bot commented on METRON-322:
---------------------------------------
Github user mattf-horton commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/481#discussion_r107781590
--- Diff:
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
---
@@ -92,22 +177,51 @@ public void prepare(Map stormConf, TopologyContext
context, OutputCollector coll
configurationTransformation = x -> x;
}
try {
- bulkMessageWriter.init(stormConf
- , configurationTransformation.apply(new
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
- );
+ WriterConfiguration writerconf = configurationTransformation.apply(
+ new IndexingWriterConfiguration(bulkMessageWriter.getName(),
getConfigurations()));
+ if (defaultBatchTimeout == 0) {
+ //This means getComponentConfiguration was never called to
initialize defaultBatchTimeout,
+ //probably because we are in a unit test scenario. So calculate
it here.
+ BatchTimeoutHelper timeoutHelper = new
BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
+ defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+ }
+ writerComponent.setDefaultBatchTimeout(defaultBatchTimeout);
+ bulkMessageWriter.init(stormConf, writerconf);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ /**
+ * Used only for unit testing.
+ */
+ public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector, Clock clock) {
+ prepare(stormConf, context, collector);
+ writerComponent.withClock(clock);
+ }
+
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
- JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
- String sensorType = MessageUtils.getSensorType(message);
try
{
- WriterConfiguration writerConfiguration =
configurationTransformation.apply(new
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
+ if (isTick(tuple)) {
+ if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
+ //WriterToBulkWriter doesn't allow batching, so no need to flush
on Tick.
+ LOG.debug("Flushing message queues older than their
batchTimeouts");
+ writerComponent.flushTimeouts(bulkMessageWriter,
configurationTransformation.apply(
+ new
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
+ , messageGetStrategy);
+ }
+ collector.ack(tuple);
--- End diff --
Good idea, will change to separate "try" wrappers around the tick tuple vs
message tuple processing.
> Global Batching and Flushing
> ----------------------------
>
> Key: METRON-322
> URL: https://issues.apache.org/jira/browse/METRON-322
> Project: Metron
> Issue Type: Improvement
> Reporter: Ajay Yadav
> Assignee: Matt Foley
>
> All Writers and other bolts that maintain an internal "batch" queue, need to
> have a timeout flush, to prevent messages from low-volume telemetries from
> sitting in their queues indefinitely. Storm has a timeout value
> (topology.message.timeout.secs) that prevents it from waiting for too long.
> If the Writer does not process the queue before the timeout, then Storm
> recycles the tuples through the topology. This has multiple undesirable
> consequences, including data duplication and waste of compute resources. We
> would like to be able to specify an interval after which the queues would
> flush, even if the batch size is not met.
> We will utilize the Storm Tick Tuple to trigger timeout flushing, following
> the recommendations of the article at
> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/#CONCLUSION
> Since every Writer processes its queue somewhat differently, every bolt that
> has a "batchSize" parameter will be given a "batchTimeout" parameter too. It
> will default to 1/2 the value of "topology.message.timeout.secs", as
> recommended, and will ignore settings larger than the default, which could
> cause failure to flush in time. In the Enrichment topology, where two
> Writers may be placed one after the other (enrichment and threat intel), the
> default timeout interval will be 1/4 the value of
> "topology.message.timeout.secs". The default value of
> "topology.message.timeout.secs" in Storm is 30 seconds.
> In addition, Storm provides a limit on the number of pending messages that
> have not been acked. If more than "topology.max.spout.pending" messages are
> waiting in a topology, then Storm will recycle them through the topology.
> However, the default value of "topology.max.spout.pending" is null, and if
> set to non-null value, the user can manage the consequences by setting
> batchSize limits appropriately. Having the timeout flush will also
> ameliorate this issue. So we do not need to address
> "topology.max.spout.pending" directly in this task.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)