Repository: apex-core Updated Branches: refs/heads/master b5ce0497e -> 93f790aa7
APEXCORE-585 #resolve do not try to calculate latency if the first window has not complete Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/5301fbf8 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/5301fbf8 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/5301fbf8 Branch: refs/heads/master Commit: 5301fbf87b9a2c0950c2615964eb1a88a1a04e97 Parents: c97dd7c Author: David Yan <[email protected]> Authored: Fri Dec 2 15:55:04 2016 -0800 Committer: David Yan <[email protected]> Committed: Tue Dec 6 18:37:23 2016 -0800 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 6 ++-- .../java/com/datatorrent/stram/LatencyTest.java | 9 +++++- .../stram/StreamingContainerManagerTest.java | 32 +++----------------- 3 files changed, 15 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/5301fbf8/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 4d193d5..8da1ed8 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1838,7 +1838,7 @@ public class StreamingContainerManager implements PlanContext public long updateOperatorLatency(PTOperator oper, UpdateOperatorLatencyContext ctx) { - if (!oper.getInputs().isEmpty()) { + if (!oper.getInputs().isEmpty() && oper.stats.currentWindowId.get() > 0) { OperatorStatus status = oper.stats; long latency = Long.MAX_VALUE; PTOperator slowestUpstream = null; @@ -1852,7 +1852,7 @@ public class StreamingContainerManager implements PlanContext if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) { continue; } - if (upstreamOp.stats.currentWindowId.get() > oper.stats.currentWindowId.get()) { + if (upstreamOp.stats.currentWindowId.get() >= oper.stats.currentWindowId.get()) { long portLatency = WindowGenerator .compareWindowId(upstreamOp.stats.currentWindowId.get(), oper.stats.currentWindowId.get(), windowWidthMillis) * windowWidthMillis; if (latency > portLatency) { @@ -1893,7 +1893,7 @@ public class StreamingContainerManager implements PlanContext return latency; } } - return 0; + return -1; } private ContainerHeartbeatResponse getHeartbeatResponse(StreamingContainerAgent sca) http://git-wip-us.apache.org/repos/asf/apex-core/blob/5301fbf8/engine/src/test/java/com/datatorrent/stram/LatencyTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/LatencyTest.java b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java index 29f525a..0e2c230 100644 --- a/engine/src/test/java/com/datatorrent/stram/LatencyTest.java +++ b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java @@ -127,7 +127,14 @@ public class LatencyTest Assert.assertEquals((1000 - 997) * windowWidthMillis, getLatency(1000, 1000, 997, false, 1000, 1500, 1600)); // When the current window is larger than upstream's current window - Assert.assertEquals(0, getLatency(1000, 1000, 1001, true, -1, -1, 1600)); + Assert.assertEquals(-1, getLatency(1000, 1000, 1001, true, -1, -1, 1600)); + + // When the current window of an operator is not available yet + Assert.assertEquals(-1, getLatency(1000, 1000, 0, false, -1, -1, -1)); + + // When the current window of an operator is the same as upstream and no end window stats are available + Assert.assertEquals(0, getLatency(1000, 90, 1000, false, -1, -1, -1)); + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/5301fbf8/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 39bfbf2..2d86618 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -81,7 +81,6 @@ import com.datatorrent.stram.engine.TestAppDataQueryOperator; import com.datatorrent.stram.engine.TestAppDataResultOperator; import com.datatorrent.stram.engine.TestAppDataSourceOperator; import com.datatorrent.stram.engine.TestGeneratorInputOperator; -import com.datatorrent.stram.engine.WindowGenerator; import com.datatorrent.stram.plan.TestPlanContext; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; @@ -1034,38 +1033,15 @@ public class StreamingContainerManagerTest public static class HighLatencyTestOperator extends GenericTestOperator { - private long firstWindowMillis; - private long windowWidthMillis; - private long currentWindowId; private long latency; @Override - public void setup(OperatorContext context) - { - firstWindowMillis = System.currentTimeMillis(); - // this is an approximation because there is no way to get to the actual value in the DAG - - windowWidthMillis = context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); - } - - @Override - public void beginWindow(long windowId) - { - currentWindowId = windowId; - } - - @Override public void endWindow() { - long sleepMillis = latency - - (System.currentTimeMillis() - WindowGenerator.getWindowMillis(currentWindowId, firstWindowMillis, windowWidthMillis)); - - if (sleepMillis > 0) { - try { - Thread.sleep(sleepMillis); - } catch (InterruptedException ex) { - // move on - } + try { + Thread.sleep(latency); + } catch (InterruptedException ex) { + // move on } }
