Github user mmiklavc commented on a diff in the pull request:
https://github.com/apache/metron/pull/1174#discussion_r215785684
--- Diff:
metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
---
@@ -127,94 +127,53 @@
@Multiline
private static String kryoSerializers;
- /**
- * The Profiler can generate profiles based on processing time. With
processing time,
- * the Profiler builds profiles based on when the telemetry is processed.
- *
- * <p>Not defining a 'timestampField' within the Profiler configuration
tells the Profiler
- * to use processing time.
- *
- * <p>There are two mechanisms that will cause a profile to flush.
- *
- * (1) As new messages arrive, time is advanced. The splitter bolt
attaches a timestamp to each
- * message (which can be either event or system time.) This advances
time and leads to profile
- * measurements being flushed.
- *
- * (2) If no messages arrive to advance time, then the "time-to-live"
mechanism will flush a profile
- * after a period of time.
- *
- * <p>This test specifically tests the *first* mechanism where time is
advanced by incoming messages.
- */
@Test
public void testProcessingTime() throws Exception {
+ uploadConfigToZookeeper(TEST_RESOURCES +
"/config/zookeeper/processing-time-test");
- // upload the config to zookeeper
- uploadConfig(TEST_RESOURCES +
"/config/zookeeper/processing-time-test");
-
- // start the topology and write test messages to kafka
+ // start the topology and write 3 test messages to kafka
fluxComponent.submitTopology();
-
- // the messages that will be applied to the profile
kafkaComponent.writeMessages(inputTopic, message1);
kafkaComponent.writeMessages(inputTopic, message2);
kafkaComponent.writeMessages(inputTopic, message3);
// retrieve the profile measurement using PROFILE_GET
String profileGetExpression = "PROFILE_GET('processing-time-test',
'10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
- List<Integer> actuals = execute(profileGetExpression, List.class);
+ List<Integer> measurements = execute(profileGetExpression, List.class);
- // storm needs at least one message to close its event window
+ // need to keep checking for measurements until the profiler has
flushed one out
int attempt = 0;
- while(actuals.size() == 0 && attempt++ < 10) {
+ while(measurements.size() == 0 && attempt++ < 10) {
// wait for the profiler to flush
long sleep = windowDurationMillis;
LOG.debug("Waiting {} millis for profiler to flush", sleep);
Thread.sleep(sleep);
- // write another message to advance time. this ensures that we are
testing the 'normal' flush mechanism.
+ // write another message to advance time. this ensures we are
testing the 'normal' flush mechanism.
// if we do not send additional messages to advance time, then it is
the profile TTL mechanism which
// will ultimately flush the profile
kafkaComponent.writeMessages(inputTopic, message2);
- // retrieve the profile measurement using PROFILE_GET
- actuals = execute(profileGetExpression, List.class);
+ // try again to retrieve the profile measurement using PROFILE_GET
+ measurements = execute(profileGetExpression, List.class);
}
- // the profile should count at least 3 messages
- assertTrue(actuals.size() > 0);
- assertTrue(actuals.get(0) >= 3);
+ // expect to see only 1 measurement, but could be more (one for each
period) depending on
--- End diff --
Ok, that makes sense - that's more what I meant when I said "period" in my
original inquiry. It's a measurement that is associated with the k-eth period.
---