Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/metron/pull/1174#discussion_r215673075
--- Diff:
metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
---
@@ -186,21 +266,40 @@ public void testEventTime() throws Exception {
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, message1);
- kafkaComponent.writeMessages(inputTopic, message2);
- kafkaComponent.writeMessages(inputTopic, message3);
-
- // wait until the profile is flushed
- waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
timeout(seconds(90)));
-
- List<Put> puts = profilerTable.getPutLog();
- assertEquals(1, puts.size());
-
- // inspect the row key to ensure the profiler used event time
correctly. the timestamp
- // embedded in the row key should match those in the source telemetry
- byte[] expectedRowKey = generateExpectedRowKey("event-time-test",
entity, startAt);
- byte[] actualRowKey = puts.get(0).getRow();
- assertArrayEquals(failMessage(expectedRowKey, actualRowKey),
expectedRowKey, actualRowKey);
+ List<String> messages = FileUtils.readLines(new
File("src/test/resources/telemetry.json"));
+ kafkaComponent.writeMessages(inputTopic, messages);
+
+ long timestamp = System.currentTimeMillis();
+ LOG.debug("Attempting to close window period by sending message with
timestamp = {}", timestamp);
+ kafkaComponent.writeMessages(inputTopic, getMessage("192.168.66.1",
timestamp));
+ kafkaComponent.writeMessages(inputTopic, getMessage("192.168.138.158",
timestamp));
+
+ // create the 'window' that looks up to 5 hours before the last
timestamp contained in the telemetry
+ assign("lastTimestamp", "1530978728982L");
--- End diff --
Really what I mean is its the latest/most recent timestamp. The records are
not expected to be in order.
---