This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new fec36c7 METRON-1778 Out-of-order timestamps may delay flush in Storm
Profiler (nickwallen) closes apache/metron#1197
fec36c7 is described below
commit fec36c7ed0b2d29f4dc48574adf02c7fcb13e5fb
Author: nickwallen <[email protected]>
AuthorDate: Tue Feb 26 14:34:39 2019 -0500
METRON-1778 Out-of-order timestamps may delay flush in Storm Profiler
(nickwallen) closes apache/metron#1197
---
.../profiler/storm/FixedFrequencyFlushSignal.java | 82 ++++++-----
.../storm/FixedFrequencyFlushSignalTest.java | 151 +++++++++++++++++++--
2 files changed, 179 insertions(+), 54 deletions(-)
diff --git
a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
index 02503c2..db4f99b 100644
---
a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
+++
b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
@@ -32,14 +32,14 @@ public class FixedFrequencyFlushSignal implements
FlushSignal {
protected static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
- * The latest known timestamp.
+ * Tracks the min timestamp.
*/
- private long currentTime;
+ private long minTime;
/**
- * The time when the next flush should occur.
+ * Tracks the max timestamp.
*/
- private long flushTime;
+ private long maxTime;
/**
* The amount of time between flushes in milliseconds.
@@ -47,7 +47,6 @@ public class FixedFrequencyFlushSignal implements FlushSignal
{
private long flushFrequency;
public FixedFrequencyFlushSignal(long flushFrequencyMillis) {
-
if(flushFrequencyMillis < 0) {
throw new IllegalArgumentException("flush frequency must be >= 0");
}
@@ -61,9 +60,8 @@ public class FixedFrequencyFlushSignal implements FlushSignal
{
*/
@Override
public void reset() {
- flushTime = 0;
- currentTime = 0;
-
+ minTime = Long.MAX_VALUE;
+ maxTime = Long.MIN_VALUE;
LOG.debug("Flush counters reset");
}
@@ -74,31 +72,34 @@ public class FixedFrequencyFlushSignal implements
FlushSignal {
*/
@Override
public void update(long timestamp) {
+ if(LOG.isWarnEnabled()) {
+ checkIfOutOfOrder(timestamp);
+ }
- if(timestamp > currentTime) {
-
- // need to update current time
- LOG.debug("Updating current time; last={}, new={}", currentTime,
timestamp);
- currentTime = timestamp;
-
- } else if ((currentTime - timestamp) > flushFrequency) {
-
- // significantly out-of-order timestamps
- LOG.warn("Timestamps out-of-order by '{}' ms. This may indicate a
problem in the data. last={}, current={}",
- (currentTime - timestamp),
- timestamp,
- currentTime);
+ if(timestamp < minTime) {
+ minTime = timestamp;
}
- if(flushTime == 0) {
+ if(timestamp > maxTime) {
+ maxTime = timestamp;
+ }
+ }
- // set the next time to flush
- flushTime = currentTime + flushFrequency;
- LOG.debug("Setting flush time; '{}' ms until flush; flushTime={},
currentTime={}, flushFreq={}",
- timeToNextFlush(),
- flushTime,
- currentTime,
- flushFrequency);
+ /**
+ * Checks if the timestamp is significantly out-of-order.
+ *
+ * @param timestamp The last timestamp.
+ */
+ private void checkIfOutOfOrder(long timestamp) {
+ // do not warn if this is the first timestamp we've seen, which will
always be 'out-of-order'
+ if (maxTime > Long.MIN_VALUE) {
+
+ long outOfOrderBy = maxTime - timestamp;
+ if (Math.abs(outOfOrderBy) > flushFrequency) {
+ LOG.warn("Timestamp out-of-order by {} ms. This may indicate a problem
in the data. " +
+ "timestamp={}, maxKnown={}, flushFreq={} ms",
+ outOfOrderBy, timestamp, maxTime, flushFrequency);
+ }
}
}
@@ -109,27 +110,20 @@ public class FixedFrequencyFlushSignal implements
FlushSignal {
*/
@Override
public boolean isTimeToFlush() {
+ boolean flush = false;
- boolean flush = currentTime > flushTime;
- LOG.debug("Flush={}, '{}' ms until flush; currentTime={}, flushTime={}",
- flush,
- timeToNextFlush(),
- currentTime,
- flushTime);
+ long flushTime = minTime + flushFrequency;
+ if(maxTime >= flushTime) {
+ flush = true;
+ }
+ LOG.debug("'{}' ms until flush; flush?={}, minTime={}, maxTime={},
flushTime={}",
+ Math.max(0, flushTime - maxTime), flush, minTime, maxTime,
flushTime);
return flush;
}
@Override
public long currentTimeMillis() {
- return currentTime;
- }
-
- /**
- * Returns the number of milliseconds to the next flush.
- * @return The time left until the next flush.
- */
- private long timeToNextFlush() {
- return Math.max(0, flushTime - currentTime);
+ return maxTime;
}
}
diff --git
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
index 8b8813b..a753dc4 100644
---
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
+++
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
@@ -31,41 +31,172 @@ public class FixedFrequencyFlushSignalTest {
@Test
public void testSignalFlush() {
+ int flushFreq = 1000;
+ FixedFrequencyFlushSignal signal = new
FixedFrequencyFlushSignal(flushFreq);
- FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
+ // not ready to flush; we have not seen any messages yet
+ assertFalse(signal.isTimeToFlush());
- // not time to flush yet
+ // not ready to flush; flushTime = min + flushFreq = 5000 + 1000 = 6000;
flush if anything >= flushTime
+ signal.update(5000);
assertFalse(signal.isTimeToFlush());
- // advance time
+ // ready to flush; flushTime = min + flushFreq = 5000 + 1000 = 6000;
max(5000,7000) >= 6000
+ signal.update(7000);
+ assertTrue(signal.isTimeToFlush());
+ }
+
+ @Test
+ public void testOutOfOrderTimestamps() {
+ int flushFreq = 5000;
+ FixedFrequencyFlushSignal signal = new
FixedFrequencyFlushSignal(flushFreq);
+
+ // not ready to flush; flushTime = min + flushFreq = 5000 + 5000 = 10000;
flush if anything >= flushTime
signal.update(5000);
+ assertFalse(signal.isTimeToFlush());
- // not time to flush yet
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 5000 = 6000
+ signal.update(1000);
assertFalse(signal.isTimeToFlush());
- // advance time
+ // ready to flush; flushTime = min + flushFreq = 1000 + 5000 = 6000;
max(5000,1000,7000) >= 6000
signal.update(7000);
+ assertTrue(signal.isTimeToFlush());
+
+ // ready to flush, still
+ signal.update(3000);
+ assertTrue(signal.isTimeToFlush());
- // time to flush
+ // ready to flush, still
assertTrue(signal.isTimeToFlush());
}
@Test
- public void testOutOfOrderTimestamps() {
- FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
+ public void testOutOfOrderTimestampsNoFlush() {
+ int flushFreq = 7000;
+ FixedFrequencyFlushSignal signal = new
FixedFrequencyFlushSignal(flushFreq);
- // advance time, out-of-order
+ // not ready to flush; flushTime = min + flushFreq = 5000 + 7000 = 12000;
flush if anything >= flushTime
signal.update(5000);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 7000 = 8000
signal.update(1000);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 7000 = 8000
signal.update(7000);
+ assertFalse(signal.isTimeToFlush());
+
+ // ready to flush; flushTime = min + flushFreq = 1000 + 7000 = 8000;
max(5000,1000,7000,3000) >= 8000
+ signal.update(3000);
+ assertFalse(signal.isTimeToFlush());
+ }
+
+ @Test
+ public void testTimestampsDescending() {
+ int flushFreq = 3000;
+ FixedFrequencyFlushSignal signal = new
FixedFrequencyFlushSignal(flushFreq);
+
+ // not ready to flush; flushTime = min + flushFreq = 4100 + 3000 = 7100;
flush if anything >= flushTime
+ signal.update(4100);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 3000 + 3000 = 6000
signal.update(3000);
+ assertFalse(signal.isTimeToFlush());
- // need to flush @ 5000 + 1000 = 6000. if anything > 6000 (even
out-of-order), then it should signal a flush
+ // not ready to flush; flushTime = min + flushFreq = 2000 + 3000 = 5000
+ signal.update(2000);
+ assertFalse(signal.isTimeToFlush());
+
+ // ready to flush; flushTime = min + flushFreq = 1000 + 3000 = 4000;
max(4100,3000,2000,1000) >= 4000
+ signal.update(1000);
assertTrue(signal.isTimeToFlush());
}
+ @Test
+ public void testTimestampsDescendingNoFlush() {
+ int flushFreq = 4000;
+ FixedFrequencyFlushSignal signal = new
FixedFrequencyFlushSignal(flushFreq);
+
+ // not ready to flush; flushTime = min + flushFreq = 4000 + 4000 = 8000;
flush if anything >= flushTime
+ signal.update(4000);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 3000 + 4000 = 7000
+ signal.update(3000);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 2000 + 4000 = 6000
+ signal.update(2000);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000
+ signal.update(1000);
+ assertFalse(signal.isTimeToFlush());
+ }
+
+ @Test
+ public void testTimestampsAscending() {
+ int flushFreq = 3000;
+ FixedFrequencyFlushSignal signal = new
FixedFrequencyFlushSignal(flushFreq);
+
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 3000 = 4000;
flush if anything >= flushTime
+ signal.update(1000);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 3000 = 4000
+ signal.update(2000);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 3000 = 4000
+ signal.update(3000);
+ assertFalse(signal.isTimeToFlush());
+
+ // ready to flush; flushTime = min + flushFreq = 1000 + 3000 = 4000;
max(1000,2000,3000,4000) >= 4000
+ signal.update(4000);
+ }
+
+ @Test
+ public void testTimestampsAscendingNoFlush() {
+ int flushFreq = 4000;
+ FixedFrequencyFlushSignal signal = new
FixedFrequencyFlushSignal(flushFreq);
+
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000;
flush if anything >= flushTime
+ signal.update(1000);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000
+ signal.update(2000);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000
+ signal.update(3000);
+ assertFalse(signal.isTimeToFlush());
+
+ // not ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000
+ signal.update(4000);
+ assertFalse(signal.isTimeToFlush());
+ }
+
@Test(expected = IllegalArgumentException.class)
public void testNegativeFrequency() {
+ // a negative flush frequency makes no sense
new FixedFrequencyFlushSignal(-1000);
}
+
+ @Test
+ public void testReset() {
+ FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(4000);
+ signal.update(1000);
+ signal.update(6000);
+
+ // ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000;
max(1000,6000) >= 5000
+ assertTrue(signal.isTimeToFlush());
+
+ // reset should turn off the flush signal
+ signal.reset();
+ assertFalse(signal.isTimeToFlush());
+ }
}