nickwallen commented on a change in pull request #1197: METRON-1778 
Out-of-order timestamps may delay flush in Storm Profiler
URL: https://github.com/apache/metron/pull/1197#discussion_r260451265
 
 

 ##########
 File path: 
metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
 ##########
 @@ -74,31 +72,34 @@ public void reset() {
    */
   @Override
   public void update(long timestamp) {
+    if(LOG.isWarnEnabled()) {
+      checkIfOutOfOrder(timestamp);
+    }
 
-    if(timestamp > currentTime) {
-
-      // need to update current time
-      LOG.debug("Updating current time; last={}, new={}", currentTime, 
timestamp);
-      currentTime = timestamp;
-
-    } else if ((currentTime - timestamp) > flushFrequency) {
-
-      // significantly out-of-order timestamps
-      LOG.warn("Timestamps out-of-order by '{}' ms. This may indicate a 
problem in the data. last={}, current={}",
-              (currentTime - timestamp),
-              timestamp,
-              currentTime);
+    if(timestamp < minTime) {
+      minTime = timestamp;
     }
 
-    if(flushTime == 0) {
+    if(timestamp > maxTime) {
+      maxTime = timestamp;
+    }
+  }
 
-      // set the next time to flush
-      flushTime = currentTime + flushFrequency;
-      LOG.debug("Setting flush time; '{}' ms until flush; flushTime={}, 
currentTime={}, flushFreq={}",
-              timeToNextFlush(),
-              flushTime,
-              currentTime,
-              flushFrequency);
+  /**
+   * Checks if the timestamp is significantly out-of-order.
+   *
+   * @param timestamp The last timestamp.
+   */
+  private void checkIfOutOfOrder(long timestamp) {
+    // do not warn if this is the first timestamp we've seen, which will 
always be 'out-of-order'
+    if (maxTime > Long.MIN_VALUE) {
+
+      long outOfOrderBy = maxTime - timestamp;
+      if (Math.abs(outOfOrderBy) > flushFrequency) {
 
 Review comment:
   Thanks for asking the question. That helped various random thoughts in my 
brain congeal into something at least a little useful.
   
   > We might want to consider watermarking as a more robust enhancement for 
solving this problem down the road. My +1 stands, thanks for the detailed 
explanation.
   
   We actually do use watermarking today.  That is what the Profiler's time lag 
setting is for.   When you increase the time lag, you are being more 
accomodating for late data, but at the expense of increased latency.  That's 
the watermark concept at work.
   
   Unfortunately a watermark doesn't stop time from being incorrectly advanced 
too far into the future, like my example.  It's only a mechanism to deal with 
late data.
   
   > 
https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html
   
   One difference between Spark's implementation over Storm, is that it has 
some limited ability to recalculate old windows.  But all that state has to be 
stored in memory, so it is not super useful.  
   
   I vaguely remember playing with that in a POC implementation of the 
Streaming Profiler in Spark and it didn't work out too well.  But we should 
definitely investigate it further.
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to