Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1174#discussion_r215760574
  
    --- Diff: 
metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
 ---
    @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception {
         kafkaComponent.writeMessages(inputTopic, message2);
         kafkaComponent.writeMessages(inputTopic, message3);
     
    +    // retrieve the profile measurement using PROFILE_GET
    +    String profileGetExpression = "PROFILE_GET('processing-time-test', 
'10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
    +    List<Integer> actuals = execute(profileGetExpression, List.class);
    +
         // storm needs at least one message to close its event window
         int attempt = 0;
    -    while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) {
    +    while(actuals.size() == 0 && attempt++ < 10) {
     
    -      // sleep, at least beyond the current window
    -      Thread.sleep(windowDurationMillis + windowLagMillis);
    +      // wait for the profiler to flush
    +      long sleep = windowDurationMillis;
    +      LOG.debug("Waiting {} millis for profiler to flush", sleep);
    +      Thread.sleep(sleep);
     
    -      // send another message to help close the current event window
    +      // write another message to advance time.  this ensures that we are 
testing the 'normal' flush mechanism.
    +      // if we do not send additional messages to advance time, then it is 
the profile TTL mechanism which
    +      // will ultimately flush the profile
           kafkaComponent.writeMessages(inputTopic, message2);
    +
    +      // retrieve the profile measurement using PROFILE_GET
    +      actuals = execute(profileGetExpression, List.class);
         }
     
    -    // validate what was flushed
    -    List<Integer> actuals = read(
    -            profilerTable.getPutLog(),
    -            columnFamily,
    -            columnBuilder.getColumnQualifier("value"),
    -            Integer.class);
    -    assertEquals(1, actuals.size());
    +    // the profile should count at least 3 messages
    +    assertTrue(actuals.size() > 0);
         assertTrue(actuals.get(0) >= 3);
       }
     
    +  /**
    +   * The Profiler can generate profiles based on processing time.  With 
processing time,
    +   * the Profiler builds profiles based on when the telemetry is processed.
    +   *
    +   * <p>Not defining a 'timestampField' within the Profiler configuration 
tells the Profiler
    +   * to use processing time.
    +   *
    +   * <p>There are two mechanisms that will cause a profile to flush.
    +   *
    +   * (1) As new messages arrive, time is advanced. The splitter bolt 
attaches a timestamp to each
    +   * message (which can be either event or system time.)  This advances 
time and leads to profile
    +   * measurements being flushed.
    +   *
    +   * (2) If no messages arrive to advance time, then the "time to live" 
mechanism will flush a profile
    +   * after a period of time.
    +   *
    +   * <p>This test specifically tests the *second* mechanism when a profile 
is flushed by the
    +   * "time to live" mechanism.
    +   */
    +  @Test
    +  public void testProcessingTimeWithTimeToLiveFlush() throws Exception {
    --- End diff --
    
    I removed the javadocs from the integration test and put it in the bolt's 
javadoc.  It seemed to fit there nicely. 
    
    Hopefully this satisfies your valid point around DRY.  But let me know if 
there is something else I can do.


---

Reply via email to