Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/metron/pull/1174#discussion_r215760574
--- Diff:
metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
---
@@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception {
kafkaComponent.writeMessages(inputTopic, message2);
kafkaComponent.writeMessages(inputTopic, message3);
+ // retrieve the profile measurement using PROFILE_GET
+ String profileGetExpression = "PROFILE_GET('processing-time-test',
'10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
+ List<Integer> actuals = execute(profileGetExpression, List.class);
+
// storm needs at least one message to close its event window
int attempt = 0;
- while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) {
+ while(actuals.size() == 0 && attempt++ < 10) {
- // sleep, at least beyond the current window
- Thread.sleep(windowDurationMillis + windowLagMillis);
+ // wait for the profiler to flush
+ long sleep = windowDurationMillis;
+ LOG.debug("Waiting {} millis for profiler to flush", sleep);
+ Thread.sleep(sleep);
- // send another message to help close the current event window
+ // write another message to advance time. this ensures that we are
testing the 'normal' flush mechanism.
+ // if we do not send additional messages to advance time, then it is
the profile TTL mechanism which
+ // will ultimately flush the profile
kafkaComponent.writeMessages(inputTopic, message2);
+
+ // retrieve the profile measurement using PROFILE_GET
+ actuals = execute(profileGetExpression, List.class);
}
- // validate what was flushed
- List<Integer> actuals = read(
- profilerTable.getPutLog(),
- columnFamily,
- columnBuilder.getColumnQualifier("value"),
- Integer.class);
- assertEquals(1, actuals.size());
+ // the profile should count at least 3 messages
+ assertTrue(actuals.size() > 0);
assertTrue(actuals.get(0) >= 3);
}
+ /**
+ * The Profiler can generate profiles based on processing time. With
processing time,
+ * the Profiler builds profiles based on when the telemetry is processed.
+ *
+ * <p>Not defining a 'timestampField' within the Profiler configuration
tells the Profiler
+ * to use processing time.
+ *
+ * <p>There are two mechanisms that will cause a profile to flush.
+ *
+ * (1) As new messages arrive, time is advanced. The splitter bolt
attaches a timestamp to each
+ * message (which can be either event or system time.) This advances
time and leads to profile
+ * measurements being flushed.
+ *
+ * (2) If no messages arrive to advance time, then the "time to live"
mechanism will flush a profile
+ * after a period of time.
+ *
+ * <p>This test specifically tests the *second* mechanism when a profile
is flushed by the
+ * "time to live" mechanism.
+ */
+ @Test
+ public void testProcessingTimeWithTimeToLiveFlush() throws Exception {
--- End diff --
I removed the javadocs from the integration test and put it in the bolt's
javadoc. It seemed to fit there nicely.
Hopefully this satisfies your valid point around DRY. But let me know if
there is something else I can do.
---