[ 
https://issues.apache.org/jira/browse/METRON-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15939118#comment-15939118
 ] 

ASF GitHub Bot commented on METRON-322:
---------------------------------------

Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/481#discussion_r107765415
  
    --- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 ---
    @@ -92,22 +177,51 @@ public void prepare(Map stormConf, TopologyContext 
context, OutputCollector coll
           configurationTransformation = x -> x;
         }
         try {
    -      bulkMessageWriter.init(stormConf
    -                            , configurationTransformation.apply(new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
    -                            );
    +      WriterConfiguration writerconf = configurationTransformation.apply(
    +              new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()));
    +      if (defaultBatchTimeout == 0) {
    +        //This means getComponentConfiguration was never called to 
initialize defaultBatchTimeout,
    +        //probably because we are in a unit test scenario.  So calculate 
it here.
    +        BatchTimeoutHelper timeoutHelper = new 
BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
    +        defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
    +      }
    +      writerComponent.setDefaultBatchTimeout(defaultBatchTimeout);
    +      bulkMessageWriter.init(stormConf, writerconf);
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
       }
     
    +  /**
    +   * Used only for unit testing.
    +   */
    +  public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector, Clock clock) {
    +    prepare(stormConf, context, collector);
    +    writerComponent.withClock(clock);
    +  }
    +
       @SuppressWarnings("unchecked")
       @Override
       public void execute(Tuple tuple) {
    -    JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
    -    String sensorType = MessageUtils.getSensorType(message);
         try
         {
    -      WriterConfiguration writerConfiguration = 
configurationTransformation.apply(new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
    +      if (isTick(tuple)) {
    +        if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
    +          //WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
    +          LOG.debug("Flushing message queues older than their 
batchTimeouts");
    +          writerComponent.flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
    +                  new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
    +                  , messageGetStrategy);
    +        }
    +        collector.ack(tuple);
    --- End diff --
    
    Should this be in a finally?


> Global Batching and Flushing
> ----------------------------
>
>                 Key: METRON-322
>                 URL: https://issues.apache.org/jira/browse/METRON-322
>             Project: Metron
>          Issue Type: Improvement
>            Reporter: Ajay Yadav
>            Assignee: Matt Foley
>
> All Writers and other bolts that maintain an internal "batch" queue, need to 
> have a timeout flush, to prevent messages from low-volume telemetries from 
> sitting in their queues indefinitely.  Storm has a timeout value 
> (topology.message.timeout.secs) that prevents it from waiting for too long. 
> If the Writer does not process the queue before the timeout, then Storm 
> recycles the tuples through the topology. This has multiple undesirable 
> consequences, including data duplication and waste of compute resources. We 
> would like to be able to specify an interval after which the queues would 
> flush, even if the batch size is not met.
> We will utilize the Storm Tick Tuple to trigger timeout flushing, following 
> the recommendations of the article at 
> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/#CONCLUSION
> Since every Writer processes its queue somewhat differently, every bolt that 
> has a "batchSize" parameter will be given a "batchTimeout" parameter too.  It 
> will default to 1/2 the value of "topology.message.timeout.secs", as 
> recommended, and will ignore settings larger than the default, which could 
> cause failure to flush in time.  In the Enrichment topology, where two 
> Writers may be placed one after the other (enrichment and threat intel), the 
> default timeout interval will be 1/4 the value of 
> "topology.message.timeout.secs".  The default value of 
> "topology.message.timeout.secs" in Storm is 30 seconds.
> In addition, Storm provides a limit on the number of pending messages that 
> have not been acked. If more than "topology.max.spout.pending" messages are 
> waiting in a topology, then Storm will recycle them through the topology. 
> However, the default value of "topology.max.spout.pending" is null, and if 
> set to non-null value, the user can manage the consequences by setting 
> batchSize limits appropriately.  Having the timeout flush will also 
> ameliorate this issue.  So we do not need to address 
> "topology.max.spout.pending" directly in this task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to