Github user mmiklavc commented on a diff in the pull request:
https://github.com/apache/metron/pull/1174#discussion_r215450093
--- Diff:
metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
---
@@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception {
kafkaComponent.writeMessages(inputTopic, message2);
kafkaComponent.writeMessages(inputTopic, message3);
+ // retrieve the profile measurement using PROFILE_GET
+ String profileGetExpression = "PROFILE_GET('processing-time-test',
'10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
+ List<Integer> actuals = execute(profileGetExpression, List.class);
+
// storm needs at least one message to close its event window
int attempt = 0;
- while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) {
+ while(actuals.size() == 0 && attempt++ < 10) {
- // sleep, at least beyond the current window
- Thread.sleep(windowDurationMillis + windowLagMillis);
+ // wait for the profiler to flush
+ long sleep = windowDurationMillis;
+ LOG.debug("Waiting {} millis for profiler to flush", sleep);
+ Thread.sleep(sleep);
- // send another message to help close the current event window
+ // write another message to advance time. this ensures that we are
testing the 'normal' flush mechanism.
+ // if we do not send additional messages to advance time, then it is
the profile TTL mechanism which
+ // will ultimately flush the profile
kafkaComponent.writeMessages(inputTopic, message2);
+
+ // retrieve the profile measurement using PROFILE_GET
+ actuals = execute(profileGetExpression, List.class);
}
- // validate what was flushed
- List<Integer> actuals = read(
- profilerTable.getPutLog(),
- columnFamily,
- columnBuilder.getColumnQualifier("value"),
- Integer.class);
- assertEquals(1, actuals.size());
+ // the profile should count at least 3 messages
+ assertTrue(actuals.size() > 0);
assertTrue(actuals.get(0) >= 3);
}
+ /**
+ * The Profiler can generate profiles based on processing time. With
processing time,
+ * the Profiler builds profiles based on when the telemetry is processed.
--- End diff --
Is processing time == system time or are these actually 2 different types
of timestamp in addition to event/source time?
---