Repository: apex-core
Updated Branches:
  refs/heads/master cf8141846 -> a54e0b7f8


APEXCORE-544 APEXCORE-379 #resolve fixed latency calculation when the upstream 
heartbeat is processed later than the downstream for a particular window


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a54e0b7f
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a54e0b7f
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a54e0b7f

Branch: refs/heads/master
Commit: a54e0b7f84a4c5fe8e0f6fa3a6f6c3fe7b13b5b3
Parents: cf81418
Author: David Yan <[email protected]>
Authored: Fri Sep 30 16:54:45 2016 -0700
Committer: Thomas Weise <[email protected]>
Committed: Tue Nov 22 16:01:16 2016 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 156 +++++++++++++------
 .../stram/engine/WindowGenerator.java           |   8 +-
 .../java/com/datatorrent/stram/LatencyTest.java | 133 ++++++++++++++++
 .../stram/engine/WindowGeneratorTest.java       |   2 +-
 4 files changed, 250 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index d5e5475..4d193d5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -276,7 +276,7 @@ public class StreamingContainerManager implements 
PlanContext
 
   private final long startTime = System.currentTimeMillis();
 
-  private static class EndWindowStats
+  static class EndWindowStats
   {
     long emitTimestamp = -1;
     HashMap<String, Long> dequeueTimestamps = new HashMap<>(); // input port 
name to end window dequeue time
@@ -816,20 +816,27 @@ public class StreamingContainerManager implements 
PlanContext
 
   private void calculateEndWindowStats()
   {
+    Map<Integer, PTOperator> allOperators = plan.getAllOperators();
+
+    UpdateOperatorLatencyContext ctx = new 
UpdateOperatorLatencyContext(rpcLatencies, endWindowStatsOperatorMap);
+
+    for (PTOperator operator : allOperators.values()) {
+      updateOperatorLatency(operator, ctx);
+    }
+
     if (!endWindowStatsOperatorMap.isEmpty()) {
-      Set<Integer> allCurrentOperators = plan.getAllOperators().keySet();
 
       if (endWindowStatsOperatorMap.size() > 
this.vars.maxWindowsBehindForStats) {
         LOG.warn("Some operators are behind for more than {} windows! Trimming 
the end window stats map", this.vars.maxWindowsBehindForStats);
         while (endWindowStatsOperatorMap.size() > 
this.vars.maxWindowsBehindForStats) {
           LOG.debug("Removing incomplete end window stats for window id {}. 
Collected operator set: {}. Complete set: {}",
               endWindowStatsOperatorMap.firstKey(),
-              
endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), 
allCurrentOperators);
+              
endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), 
allOperators.keySet());
           
endWindowStatsOperatorMap.remove(endWindowStatsOperatorMap.firstKey());
         }
       }
       //logicalMetrics.clear();
-      int numOperators = allCurrentOperators.size();
+      int numOperators = allOperators.size();
       Long windowId = endWindowStatsOperatorMap.firstKey();
       while (windowId != null) {
         Map<Integer, EndWindowStats> endWindowStatsMap = 
endWindowStatsOperatorMap.get(windowId);
@@ -838,7 +845,7 @@ public class StreamingContainerManager implements 
PlanContext
         aggregateMetrics(windowId, endWindowStatsMap);
         criticalPathInfo = findCriticalPath();
 
-        if (allCurrentOperators.containsAll(endWindowStatsOperators)) {
+        if (allOperators.keySet().containsAll(endWindowStatsOperators)) {
           if (endWindowStatsMap.size() < numOperators) {
             if (windowId < completeEndWindowStatsWindowId) {
               LOG.debug("Disregarding stale end window stats for window {}", 
windowId);
@@ -1704,45 +1711,6 @@ public class StreamingContainerManager implements 
PlanContext
             }
             endWindowStatsMap.put(shb.getNodeId(), endWindowStats);
 
-            if (!oper.getInputs().isEmpty()) {
-              long latency = Long.MAX_VALUE;
-              long adjustedEndWindowEmitTimestamp = 
endWindowStats.emitTimestamp;
-              MovingAverageLong rpcLatency = 
rpcLatencies.get(oper.getContainer().getExternalId());
-              if (rpcLatency != null) {
-                adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
-              }
-              PTOperator slowestUpstream = null;
-              for (PTInput input : oper.getInputs()) {
-                PTOperator upstreamOp = input.source.source;
-                if (upstreamOp.getOperatorMeta().getOperator() instanceof 
Operator.DelayOperator) {
-                  continue;
-                }
-                EndWindowStats ews = endWindowStatsMap.get(upstreamOp.getId());
-                long portLatency;
-                if (ews == null) {
-                  // This is when the operator is likely to be behind too many 
windows. We need to give an estimate for
-                  // latency at this point, by looking at the number of 
windows behind
-                  int widthMillis = 
plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
-                  portLatency = (upstreamOp.stats.currentWindowId.get() - 
oper.stats.currentWindowId.get()) * widthMillis;
-                } else {
-                  MovingAverageLong upstreamRPCLatency = 
rpcLatencies.get(upstreamOp.getContainer().getExternalId());
-                  portLatency = adjustedEndWindowEmitTimestamp - 
ews.emitTimestamp;
-                  if (upstreamRPCLatency != null) {
-                    portLatency -= upstreamRPCLatency.getAvg();
-                  }
-                }
-                if (portLatency < 0) {
-                  portLatency = 0;
-                }
-                if (latency > portLatency) {
-                  latency = portLatency;
-                  slowestUpstream = upstreamOp;
-                }
-              }
-              status.latencyMA.add(latency);
-              slowestUpstreamOp.put(oper, slowestUpstream);
-            }
-
             Set<Integer> allCurrentOperators = plan.getAllOperators().keySet();
             int numOperators = plan.getAllOperators().size();
             if (allCurrentOperators.containsAll(endWindowStatsMap.keySet()) && 
endWindowStatsMap.size() == numOperators) {
@@ -1828,6 +1796,106 @@ public class StreamingContainerManager implements 
PlanContext
     return rsp;
   }
 
+  static class UpdateOperatorLatencyContext
+  {
+    Map<String, MovingAverageLong> rpcLatencies;
+    Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap;
+
+    UpdateOperatorLatencyContext()
+    {
+    }
+
+    UpdateOperatorLatencyContext(Map<String, MovingAverageLong> rpcLatencies, 
Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap)
+    {
+      this.rpcLatencies = rpcLatencies;
+      this.endWindowStatsOperatorMap = endWindowStatsOperatorMap;
+    }
+
+    long getRPCLatency(PTOperator oper)
+    {
+      MovingAverageLong rpcLatency = 
rpcLatencies.get(oper.getContainer().getExternalId());
+      return rpcLatency == null ? 0 : rpcLatency.getAvg();
+    }
+
+    boolean endWindowStatsExists(long windowId)
+    {
+      return endWindowStatsOperatorMap.containsKey(windowId);
+    }
+
+    long getEndWindowEmitTimestamp(long windowId, PTOperator oper)
+    {
+      Map<Integer, EndWindowStats> endWindowStatsMap = 
endWindowStatsOperatorMap.get(windowId);
+      if (endWindowStatsMap == null) {
+        return -1;
+      }
+      EndWindowStats ews = endWindowStatsMap.get(oper.getId());
+      if (ews == null) {
+        return -1;
+      }
+      return ews.emitTimestamp;
+    }
+  }
+
+  public long updateOperatorLatency(PTOperator oper, 
UpdateOperatorLatencyContext ctx)
+  {
+    if (!oper.getInputs().isEmpty()) {
+      OperatorStatus status = oper.stats;
+      long latency = Long.MAX_VALUE;
+      PTOperator slowestUpstream = null;
+      int windowWidthMillis = 
plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
+      int heartbeatTimeoutMillis = 
plan.getLogicalPlan().getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS);
+      long currentWindowId = status.currentWindowId.get();
+      if (!ctx.endWindowStatsExists(currentWindowId)) {
+        // the end window stats for the current window id is not available, 
estimate latency by looking at upstream window id
+        for (PTInput input : oper.getInputs()) {
+          PTOperator upstreamOp = input.source.source;
+          if (upstreamOp.getOperatorMeta().getOperator() instanceof 
Operator.DelayOperator) {
+            continue;
+          }
+          if (upstreamOp.stats.currentWindowId.get() > 
oper.stats.currentWindowId.get()) {
+            long portLatency = WindowGenerator
+                .compareWindowId(upstreamOp.stats.currentWindowId.get(), 
oper.stats.currentWindowId.get(), windowWidthMillis) * windowWidthMillis;
+            if (latency > portLatency) {
+              latency = portLatency;
+              slowestUpstream = upstreamOp;
+            }
+          }
+        }
+      } else {
+        long endWindowEmitTime = 
ctx.getEndWindowEmitTimestamp(currentWindowId, oper);
+        long adjustedEndWindowEmitTimestamp = endWindowEmitTime + 
ctx.getRPCLatency(oper);
+        for (PTInput input : oper.getInputs()) {
+          PTOperator upstreamOp = input.source.source;
+          if (upstreamOp.getOperatorMeta().getOperator() instanceof 
Operator.DelayOperator) {
+            continue;
+          }
+          long upstreamEndWindowEmitTime = 
ctx.getEndWindowEmitTimestamp(currentWindowId, upstreamOp);
+          if (upstreamEndWindowEmitTime < 0) {
+            continue;
+          }
+          long portLatency = adjustedEndWindowEmitTimestamp - 
(upstreamEndWindowEmitTime + ctx.getRPCLatency(upstreamOp));
+          if (portLatency < 0) {
+            portLatency = 0;
+          }
+          long latencyFromWindowsBehind = 
WindowGenerator.compareWindowId(upstreamOp.stats.currentWindowId.get(), 
oper.stats.currentWindowId.get(), windowWidthMillis) * windowWidthMillis;
+          if (latencyFromWindowsBehind > portLatency && 
latencyFromWindowsBehind > heartbeatTimeoutMillis) {
+            portLatency = latencyFromWindowsBehind;
+          }
+          if (latency > portLatency) {
+            latency = portLatency;
+            slowestUpstream = upstreamOp;
+          }
+        }
+      }
+      if (slowestUpstream != null) {
+        status.latencyMA.add(latency);
+        slowestUpstreamOp.put(oper, slowestUpstream);
+        return latency;
+      }
+    }
+    return 0;
+  }
+
   private ContainerHeartbeatResponse 
getHeartbeatResponse(StreamingContainerAgent sca)
   {
     ContainerHeartbeatResponse rsp = new ContainerHeartbeatResponse();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java 
b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index 4793924..3a8438d 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -282,14 +282,14 @@ public class WindowGenerator extends MuxReservoir 
implements Stream, Runnable
    *
    * @param windowIdA
    * @param windowIdB
-   * @param firstWindowMillis
    * @param windowWidthMillis
    * @return the number of windows ahead, negative if windowIdA is behind 
windowIdB
    */
-  public static long compareWindowId(long windowIdA, long windowIdB, long 
firstWindowMillis, long windowWidthMillis)
+  public static long compareWindowId(long windowIdA, long windowIdB, long 
windowWidthMillis)
   {
-    long millisA = getWindowMillis(windowIdA, firstWindowMillis, 
windowWidthMillis);
-    long millisB = getWindowMillis(windowIdB, firstWindowMillis, 
windowWidthMillis);
+    // firstWindowMillis here actually does not matter since they will be 
subtracted out.
+    long millisA = getWindowMillis(windowIdA, 0, windowWidthMillis);
+    long millisB = getWindowMillis(windowIdB, 0, windowWidthMillis);
     return (millisA - millisB) / windowWidthMillis;
   }
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LatencyTest.java 
b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
new file mode 100644
index 0000000..29f525a
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
+
+public class LatencyTest
+{
+  private static final Logger LOG = LoggerFactory.getLogger(LatencyTest.class);
+  @Rule
+  public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
+
+  private LogicalPlan dag;
+  private StreamingContainerManager scm;
+  private PTOperator o1p1;
+  private PTOperator o2p1;
+  private PTOperator o3p1;
+
+  private static final int windowWidthMillis = 600;
+  private static final int heartbeatTimeoutMillis = 30000;
+
+  @Before
+  public void setup()
+  {
+    dag = StramTestSupport.createDAG(testMeta);
+    dag.setAttribute(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 
windowWidthMillis);
+    dag.setAttribute(Context.DAGContext.HEARTBEAT_TIMEOUT_MILLIS, 
heartbeatTimeoutMillis);
+    
dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new 
StramTestSupport
+        .MemoryStorageAgent());
+
+    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
+
+    dag.addStream("o1.output1", o1.outport1, o3.inport1);
+    dag.addStream("o2.output1", o2.outport1, o3.inport2);
+    scm = new StreamingContainerManager(dag);
+    PhysicalPlan plan = scm.getPhysicalPlan();
+    o1p1 = plan.getOperators(dag.getMeta(o1)).get(0);
+    o2p1 = plan.getOperators(dag.getMeta(o2)).get(0);
+    o3p1 = plan.getOperators(dag.getMeta(o3)).get(0);
+  }
+
+  private long getLatency(long windowId1, long windowId2, long windowId3, 
final boolean endWindowStatsExists, final long ewt1, final long ewt2, final 
long ewt3)
+  {
+    o1p1.stats.statsRevs.checkout();
+    o1p1.stats.currentWindowId.set(windowId1);
+    o1p1.stats.statsRevs.commit();
+
+    o2p1.stats.statsRevs.checkout();
+    o2p1.stats.currentWindowId.set(windowId2);
+    o2p1.stats.statsRevs.commit();
+
+    o3p1.stats.statsRevs.checkout();
+    o3p1.stats.currentWindowId.set(windowId3);
+    o3p1.stats.statsRevs.commit();
+
+    return scm.updateOperatorLatency(o3p1, new 
StreamingContainerManager.UpdateOperatorLatencyContext()
+    {
+      @Override
+      long getRPCLatency(PTOperator oper)
+      {
+        return 0;
+      }
+
+      @Override
+      boolean endWindowStatsExists(long windowId)
+      {
+        return endWindowStatsExists;
+      }
+
+      @Override
+      long getEndWindowEmitTimestamp(long windowId, PTOperator oper)
+      {
+        if (oper == o1p1) {
+          return ewt1;
+        } else if (oper == o2p1) {
+          return ewt2;
+        } else if (oper == o3p1) {
+          return ewt3;
+        } else {
+          Assert.fail();
+          return 0;
+        }
+      }
+    });
+  }
+
+  @Test
+  public void testLatency()
+  {
+    // When all end window stats are available and latency within 
heartbeatTimeout
+    Assert.assertEquals(100, getLatency(1000, 1000, 1000, true, 1000, 1500, 
1600));
+
+    // When all end window stats are available and calculated latency is more 
than heartbeatTimeout
+    Assert.assertEquals((10000 - 100) * windowWidthMillis, getLatency(10000, 
10000, 100, true, 1000, 1500, 1600));
+
+    // When end window stats are not available
+    Assert.assertEquals((1000 - 997) * windowWidthMillis, getLatency(1000, 
1000, 997, false, 1000, 1500, 1600));
+
+    // When the current window is larger than upstream's current window
+    Assert.assertEquals(0, getLatency(1000, 1000, 1001, true, -1, -1, 1600));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index e302f98..0c95b75 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -383,7 +383,7 @@ public class WindowGeneratorTest
     for (int windowWidthMillis : new int[]{500, 123}) {
       long window1 = WindowGenerator.getWindowId(first, first, 
windowWidthMillis);
       long window2 = WindowGenerator.getAheadWindowId(window1, first, 
windowWidthMillis, ahead);
-      Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, 
window1, first, windowWidthMillis));
+      Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, 
window1, windowWidthMillis));
     }
   }
 

Reply via email to