Repository: apex-core
Updated Branches:
  refs/heads/master b5ce0497e -> 93f790aa7


APEXCORE-585 #resolve do not try to calculate latency if the first window has 
not complete


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/5301fbf8
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/5301fbf8
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/5301fbf8

Branch: refs/heads/master
Commit: 5301fbf87b9a2c0950c2615964eb1a88a1a04e97
Parents: c97dd7c
Author: David Yan <[email protected]>
Authored: Fri Dec 2 15:55:04 2016 -0800
Committer: David Yan <[email protected]>
Committed: Tue Dec 6 18:37:23 2016 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        |  6 ++--
 .../java/com/datatorrent/stram/LatencyTest.java |  9 +++++-
 .../stram/StreamingContainerManagerTest.java    | 32 +++-----------------
 3 files changed, 15 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/5301fbf8/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 4d193d5..8da1ed8 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1838,7 +1838,7 @@ public class StreamingContainerManager implements 
PlanContext
 
   public long updateOperatorLatency(PTOperator oper, 
UpdateOperatorLatencyContext ctx)
   {
-    if (!oper.getInputs().isEmpty()) {
+    if (!oper.getInputs().isEmpty() && oper.stats.currentWindowId.get() > 0) {
       OperatorStatus status = oper.stats;
       long latency = Long.MAX_VALUE;
       PTOperator slowestUpstream = null;
@@ -1852,7 +1852,7 @@ public class StreamingContainerManager implements 
PlanContext
           if (upstreamOp.getOperatorMeta().getOperator() instanceof 
Operator.DelayOperator) {
             continue;
           }
-          if (upstreamOp.stats.currentWindowId.get() > 
oper.stats.currentWindowId.get()) {
+          if (upstreamOp.stats.currentWindowId.get() >= 
oper.stats.currentWindowId.get()) {
             long portLatency = WindowGenerator
                 .compareWindowId(upstreamOp.stats.currentWindowId.get(), 
oper.stats.currentWindowId.get(), windowWidthMillis) * windowWidthMillis;
             if (latency > portLatency) {
@@ -1893,7 +1893,7 @@ public class StreamingContainerManager implements 
PlanContext
         return latency;
       }
     }
-    return 0;
+    return -1;
   }
 
   private ContainerHeartbeatResponse 
getHeartbeatResponse(StreamingContainerAgent sca)

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5301fbf8/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LatencyTest.java 
b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
index 29f525a..0e2c230 100644
--- a/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
@@ -127,7 +127,14 @@ public class LatencyTest
     Assert.assertEquals((1000 - 997) * windowWidthMillis, getLatency(1000, 
1000, 997, false, 1000, 1500, 1600));
 
     // When the current window is larger than upstream's current window
-    Assert.assertEquals(0, getLatency(1000, 1000, 1001, true, -1, -1, 1600));
+    Assert.assertEquals(-1, getLatency(1000, 1000, 1001, true, -1, -1, 1600));
+
+    // When the current window of an operator is not available yet
+    Assert.assertEquals(-1, getLatency(1000, 1000, 0, false, -1, -1, -1));
+
+    // When the current window of an operator is the same as upstream and no 
end window stats are available
+    Assert.assertEquals(0, getLatency(1000, 90, 1000, false, -1, -1, -1));
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/5301fbf8/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java 
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 39bfbf2..2d86618 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -81,7 +81,6 @@ import com.datatorrent.stram.engine.TestAppDataQueryOperator;
 import com.datatorrent.stram.engine.TestAppDataResultOperator;
 import com.datatorrent.stram.engine.TestAppDataSourceOperator;
 import com.datatorrent.stram.engine.TestGeneratorInputOperator;
-import com.datatorrent.stram.engine.WindowGenerator;
 import com.datatorrent.stram.plan.TestPlanContext;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
@@ -1034,38 +1033,15 @@ public class StreamingContainerManagerTest
 
   public static class HighLatencyTestOperator extends GenericTestOperator
   {
-    private long firstWindowMillis;
-    private long windowWidthMillis;
-    private long currentWindowId;
     private long latency;
 
     @Override
-    public void setup(OperatorContext context)
-    {
-      firstWindowMillis = System.currentTimeMillis();
-      // this is an approximation because there is no way to get to the actual 
value in the DAG
-
-      windowWidthMillis = 
context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
-    }
-
-    @Override
-    public void beginWindow(long windowId)
-    {
-      currentWindowId = windowId;
-    }
-
-    @Override
     public void endWindow()
     {
-      long sleepMillis = latency -
-          (System.currentTimeMillis() - 
WindowGenerator.getWindowMillis(currentWindowId, firstWindowMillis, 
windowWidthMillis));
-
-      if (sleepMillis > 0) {
-        try {
-          Thread.sleep(sleepMillis);
-        } catch (InterruptedException ex) {
-          // move on
-        }
+      try {
+        Thread.sleep(latency);
+      } catch (InterruptedException ex) {
+        // move on
       }
     }
 

Reply via email to