Repository: apex-core Updated Branches: refs/heads/master cf8141846 -> a54e0b7f8
APEXCORE-544 APEXCORE-379 #resolve fixed latency calculation when the upstream heartbeat is processed later than the downstream for a particular window Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a54e0b7f Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a54e0b7f Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a54e0b7f Branch: refs/heads/master Commit: a54e0b7f84a4c5fe8e0f6fa3a6f6c3fe7b13b5b3 Parents: cf81418 Author: David Yan <[email protected]> Authored: Fri Sep 30 16:54:45 2016 -0700 Committer: Thomas Weise <[email protected]> Committed: Tue Nov 22 16:01:16 2016 -0800 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 156 +++++++++++++------ .../stram/engine/WindowGenerator.java | 8 +- .../java/com/datatorrent/stram/LatencyTest.java | 133 ++++++++++++++++ .../stram/engine/WindowGeneratorTest.java | 2 +- 4 files changed, 250 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index d5e5475..4d193d5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -276,7 +276,7 @@ public class StreamingContainerManager implements PlanContext private final long startTime = System.currentTimeMillis(); - private static class EndWindowStats + static class EndWindowStats { long emitTimestamp = -1; HashMap<String, Long> dequeueTimestamps = new HashMap<>(); // input port name to end window dequeue time @@ -816,20 +816,27 @@ public class StreamingContainerManager implements PlanContext private void calculateEndWindowStats() { + Map<Integer, PTOperator> allOperators = plan.getAllOperators(); + + UpdateOperatorLatencyContext ctx = new UpdateOperatorLatencyContext(rpcLatencies, endWindowStatsOperatorMap); + + for (PTOperator operator : allOperators.values()) { + updateOperatorLatency(operator, ctx); + } + if (!endWindowStatsOperatorMap.isEmpty()) { - Set<Integer> allCurrentOperators = plan.getAllOperators().keySet(); if (endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) { LOG.warn("Some operators are behind for more than {} windows! Trimming the end window stats map", this.vars.maxWindowsBehindForStats); while (endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) { LOG.debug("Removing incomplete end window stats for window id {}. Collected operator set: {}. Complete set: {}", endWindowStatsOperatorMap.firstKey(), - endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), allCurrentOperators); + endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), allOperators.keySet()); endWindowStatsOperatorMap.remove(endWindowStatsOperatorMap.firstKey()); } } //logicalMetrics.clear(); - int numOperators = allCurrentOperators.size(); + int numOperators = allOperators.size(); Long windowId = endWindowStatsOperatorMap.firstKey(); while (windowId != null) { Map<Integer, EndWindowStats> endWindowStatsMap = endWindowStatsOperatorMap.get(windowId); @@ -838,7 +845,7 @@ public class StreamingContainerManager implements PlanContext aggregateMetrics(windowId, endWindowStatsMap); criticalPathInfo = findCriticalPath(); - if (allCurrentOperators.containsAll(endWindowStatsOperators)) { + if (allOperators.keySet().containsAll(endWindowStatsOperators)) { if (endWindowStatsMap.size() < numOperators) { if (windowId < completeEndWindowStatsWindowId) { LOG.debug("Disregarding stale end window stats for window {}", windowId); @@ -1704,45 +1711,6 @@ public class StreamingContainerManager implements PlanContext } endWindowStatsMap.put(shb.getNodeId(), endWindowStats); - if (!oper.getInputs().isEmpty()) { - long latency = Long.MAX_VALUE; - long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp; - MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId()); - if (rpcLatency != null) { - adjustedEndWindowEmitTimestamp += rpcLatency.getAvg(); - } - PTOperator slowestUpstream = null; - for (PTInput input : oper.getInputs()) { - PTOperator upstreamOp = input.source.source; - if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) { - continue; - } - EndWindowStats ews = endWindowStatsMap.get(upstreamOp.getId()); - long portLatency; - if (ews == null) { - // This is when the operator is likely to be behind too many windows. We need to give an estimate for - // latency at this point, by looking at the number of windows behind - int widthMillis = plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS); - portLatency = (upstreamOp.stats.currentWindowId.get() - oper.stats.currentWindowId.get()) * widthMillis; - } else { - MovingAverageLong upstreamRPCLatency = rpcLatencies.get(upstreamOp.getContainer().getExternalId()); - portLatency = adjustedEndWindowEmitTimestamp - ews.emitTimestamp; - if (upstreamRPCLatency != null) { - portLatency -= upstreamRPCLatency.getAvg(); - } - } - if (portLatency < 0) { - portLatency = 0; - } - if (latency > portLatency) { - latency = portLatency; - slowestUpstream = upstreamOp; - } - } - status.latencyMA.add(latency); - slowestUpstreamOp.put(oper, slowestUpstream); - } - Set<Integer> allCurrentOperators = plan.getAllOperators().keySet(); int numOperators = plan.getAllOperators().size(); if (allCurrentOperators.containsAll(endWindowStatsMap.keySet()) && endWindowStatsMap.size() == numOperators) { @@ -1828,6 +1796,106 @@ public class StreamingContainerManager implements PlanContext return rsp; } + static class UpdateOperatorLatencyContext + { + Map<String, MovingAverageLong> rpcLatencies; + Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap; + + UpdateOperatorLatencyContext() + { + } + + UpdateOperatorLatencyContext(Map<String, MovingAverageLong> rpcLatencies, Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap) + { + this.rpcLatencies = rpcLatencies; + this.endWindowStatsOperatorMap = endWindowStatsOperatorMap; + } + + long getRPCLatency(PTOperator oper) + { + MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId()); + return rpcLatency == null ? 0 : rpcLatency.getAvg(); + } + + boolean endWindowStatsExists(long windowId) + { + return endWindowStatsOperatorMap.containsKey(windowId); + } + + long getEndWindowEmitTimestamp(long windowId, PTOperator oper) + { + Map<Integer, EndWindowStats> endWindowStatsMap = endWindowStatsOperatorMap.get(windowId); + if (endWindowStatsMap == null) { + return -1; + } + EndWindowStats ews = endWindowStatsMap.get(oper.getId()); + if (ews == null) { + return -1; + } + return ews.emitTimestamp; + } + } + + public long updateOperatorLatency(PTOperator oper, UpdateOperatorLatencyContext ctx) + { + if (!oper.getInputs().isEmpty()) { + OperatorStatus status = oper.stats; + long latency = Long.MAX_VALUE; + PTOperator slowestUpstream = null; + int windowWidthMillis = plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS); + int heartbeatTimeoutMillis = plan.getLogicalPlan().getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS); + long currentWindowId = status.currentWindowId.get(); + if (!ctx.endWindowStatsExists(currentWindowId)) { + // the end window stats for the current window id is not available, estimate latency by looking at upstream window id + for (PTInput input : oper.getInputs()) { + PTOperator upstreamOp = input.source.source; + if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) { + continue; + } + if (upstreamOp.stats.currentWindowId.get() > oper.stats.currentWindowId.get()) { + long portLatency = WindowGenerator + .compareWindowId(upstreamOp.stats.currentWindowId.get(), oper.stats.currentWindowId.get(), windowWidthMillis) * windowWidthMillis; + if (latency > portLatency) { + latency = portLatency; + slowestUpstream = upstreamOp; + } + } + } + } else { + long endWindowEmitTime = ctx.getEndWindowEmitTimestamp(currentWindowId, oper); + long adjustedEndWindowEmitTimestamp = endWindowEmitTime + ctx.getRPCLatency(oper); + for (PTInput input : oper.getInputs()) { + PTOperator upstreamOp = input.source.source; + if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) { + continue; + } + long upstreamEndWindowEmitTime = ctx.getEndWindowEmitTimestamp(currentWindowId, upstreamOp); + if (upstreamEndWindowEmitTime < 0) { + continue; + } + long portLatency = adjustedEndWindowEmitTimestamp - (upstreamEndWindowEmitTime + ctx.getRPCLatency(upstreamOp)); + if (portLatency < 0) { + portLatency = 0; + } + long latencyFromWindowsBehind = WindowGenerator.compareWindowId(upstreamOp.stats.currentWindowId.get(), oper.stats.currentWindowId.get(), windowWidthMillis) * windowWidthMillis; + if (latencyFromWindowsBehind > portLatency && latencyFromWindowsBehind > heartbeatTimeoutMillis) { + portLatency = latencyFromWindowsBehind; + } + if (latency > portLatency) { + latency = portLatency; + slowestUpstream = upstreamOp; + } + } + } + if (slowestUpstream != null) { + status.latencyMA.add(latency); + slowestUpstreamOp.put(oper, slowestUpstream); + return latency; + } + } + return 0; + } + private ContainerHeartbeatResponse getHeartbeatResponse(StreamingContainerAgent sca) { ContainerHeartbeatResponse rsp = new ContainerHeartbeatResponse(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java index 4793924..3a8438d 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java @@ -282,14 +282,14 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable * * @param windowIdA * @param windowIdB - * @param firstWindowMillis * @param windowWidthMillis * @return the number of windows ahead, negative if windowIdA is behind windowIdB */ - public static long compareWindowId(long windowIdA, long windowIdB, long firstWindowMillis, long windowWidthMillis) + public static long compareWindowId(long windowIdA, long windowIdB, long windowWidthMillis) { - long millisA = getWindowMillis(windowIdA, firstWindowMillis, windowWidthMillis); - long millisB = getWindowMillis(windowIdB, firstWindowMillis, windowWidthMillis); + // firstWindowMillis here actually does not matter since they will be subtracted out. + long millisA = getWindowMillis(windowIdA, 0, windowWidthMillis); + long millisB = getWindowMillis(windowIdB, 0, windowWidthMillis); return (millisA - millisB) / windowWidthMillis; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/test/java/com/datatorrent/stram/LatencyTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/LatencyTest.java b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java new file mode 100644 index 0000000..29f525a --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context; +import com.datatorrent.stram.engine.GenericTestOperator; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.physical.PTOperator; +import com.datatorrent.stram.plan.physical.PhysicalPlan; +import com.datatorrent.stram.support.StramTestSupport; + +public class LatencyTest +{ + private static final Logger LOG = LoggerFactory.getLogger(LatencyTest.class); + @Rule + public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta(); + + private LogicalPlan dag; + private StreamingContainerManager scm; + private PTOperator o1p1; + private PTOperator o2p1; + private PTOperator o3p1; + + private static final int windowWidthMillis = 600; + private static final int heartbeatTimeoutMillis = 30000; + + @Before + public void setup() + { + dag = StramTestSupport.createDAG(testMeta); + dag.setAttribute(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, windowWidthMillis); + dag.setAttribute(Context.DAGContext.HEARTBEAT_TIMEOUT_MILLIS, heartbeatTimeoutMillis); + dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new StramTestSupport + .MemoryStorageAgent()); + + GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); + + dag.addStream("o1.output1", o1.outport1, o3.inport1); + dag.addStream("o2.output1", o2.outport1, o3.inport2); + scm = new StreamingContainerManager(dag); + PhysicalPlan plan = scm.getPhysicalPlan(); + o1p1 = plan.getOperators(dag.getMeta(o1)).get(0); + o2p1 = plan.getOperators(dag.getMeta(o2)).get(0); + o3p1 = plan.getOperators(dag.getMeta(o3)).get(0); + } + + private long getLatency(long windowId1, long windowId2, long windowId3, final boolean endWindowStatsExists, final long ewt1, final long ewt2, final long ewt3) + { + o1p1.stats.statsRevs.checkout(); + o1p1.stats.currentWindowId.set(windowId1); + o1p1.stats.statsRevs.commit(); + + o2p1.stats.statsRevs.checkout(); + o2p1.stats.currentWindowId.set(windowId2); + o2p1.stats.statsRevs.commit(); + + o3p1.stats.statsRevs.checkout(); + o3p1.stats.currentWindowId.set(windowId3); + o3p1.stats.statsRevs.commit(); + + return scm.updateOperatorLatency(o3p1, new StreamingContainerManager.UpdateOperatorLatencyContext() + { + @Override + long getRPCLatency(PTOperator oper) + { + return 0; + } + + @Override + boolean endWindowStatsExists(long windowId) + { + return endWindowStatsExists; + } + + @Override + long getEndWindowEmitTimestamp(long windowId, PTOperator oper) + { + if (oper == o1p1) { + return ewt1; + } else if (oper == o2p1) { + return ewt2; + } else if (oper == o3p1) { + return ewt3; + } else { + Assert.fail(); + return 0; + } + } + }); + } + + @Test + public void testLatency() + { + // When all end window stats are available and latency within heartbeatTimeout + Assert.assertEquals(100, getLatency(1000, 1000, 1000, true, 1000, 1500, 1600)); + + // When all end window stats are available and calculated latency is more than heartbeatTimeout + Assert.assertEquals((10000 - 100) * windowWidthMillis, getLatency(10000, 10000, 100, true, 1000, 1500, 1600)); + + // When end window stats are not available + Assert.assertEquals((1000 - 997) * windowWidthMillis, getLatency(1000, 1000, 997, false, 1000, 1500, 1600)); + + // When the current window is larger than upstream's current window + Assert.assertEquals(0, getLatency(1000, 1000, 1001, true, -1, -1, 1600)); + } + +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java index e302f98..0c95b75 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java @@ -383,7 +383,7 @@ public class WindowGeneratorTest for (int windowWidthMillis : new int[]{500, 123}) { long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis); long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead); - Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, window1, first, windowWidthMillis)); + Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, window1, windowWidthMillis)); } }
