Github user mmiklavc commented on a diff in the pull request:
https://github.com/apache/metron/pull/1174#discussion_r215452935
--- Diff:
metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
---
@@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception {
kafkaComponent.writeMessages(inputTopic, message2);
kafkaComponent.writeMessages(inputTopic, message3);
+ // retrieve the profile measurement using PROFILE_GET
+ String profileGetExpression = "PROFILE_GET('processing-time-test',
'10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
+ List<Integer> actuals = execute(profileGetExpression, List.class);
+
// storm needs at least one message to close its event window
int attempt = 0;
- while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) {
+ while(actuals.size() == 0 && attempt++ < 10) {
- // sleep, at least beyond the current window
- Thread.sleep(windowDurationMillis + windowLagMillis);
+ // wait for the profiler to flush
+ long sleep = windowDurationMillis;
+ LOG.debug("Waiting {} millis for profiler to flush", sleep);
+ Thread.sleep(sleep);
- // send another message to help close the current event window
+ // write another message to advance time. this ensures that we are
testing the 'normal' flush mechanism.
+ // if we do not send additional messages to advance time, then it is
the profile TTL mechanism which
+ // will ultimately flush the profile
kafkaComponent.writeMessages(inputTopic, message2);
+
+ // retrieve the profile measurement using PROFILE_GET
+ actuals = execute(profileGetExpression, List.class);
}
- // validate what was flushed
- List<Integer> actuals = read(
- profilerTable.getPutLog(),
- columnFamily,
- columnBuilder.getColumnQualifier("value"),
- Integer.class);
- assertEquals(1, actuals.size());
+ // the profile should count at least 3 messages
+ assertTrue(actuals.size() > 0);
assertTrue(actuals.get(0) >= 3);
}
+ /**
+ * The Profiler can generate profiles based on processing time. With
processing time,
+ * the Profiler builds profiles based on when the telemetry is processed.
+ *
+ * <p>Not defining a 'timestampField' within the Profiler configuration
tells the Profiler
+ * to use processing time.
+ *
+ * <p>There are two mechanisms that will cause a profile to flush.
+ *
+ * (1) As new messages arrive, time is advanced. The splitter bolt
attaches a timestamp to each
+ * message (which can be either event or system time.) This advances
time and leads to profile
+ * measurements being flushed.
+ *
+ * (2) If no messages arrive to advance time, then the "time to live"
mechanism will flush a profile
+ * after a period of time.
+ *
+ * <p>This test specifically tests the *second* mechanism when a profile
is flushed by the
+ * "time to live" mechanism.
+ */
+ @Test
+ public void testProcessingTimeWithTimeToLiveFlush() throws Exception {
+
+ // upload the config to zookeeper
--- End diff --
Can we remove the redundant javadoc? Your code speaks for itself here. I'd
probably just rename that method precisely as "uploadConfigToZookeeper". Very
clear and with 1 less non-executable line.
---