APEXCORE-306 Update checkpoints for strongly connected operators as group.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b3402be5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b3402be5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b3402be5 Branch: refs/heads/master Commit: b3402be5a45728515f4a8328fec5a76ddede0350 Parents: 4d5828c Author: Thomas Weise <[email protected]> Authored: Thu Jan 21 16:39:55 2016 -0800 Committer: David Yan <[email protected]> Committed: Fri Jan 22 19:24:31 2016 -0800 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 161 +++++++++++++------ .../com/datatorrent/stram/api/Checkpoint.java | 11 ++ .../stram/plan/logical/LogicalPlan.java | 91 +++++++---- .../com/datatorrent/stram/CheckpointTest.java | 3 +- .../stram/plan/logical/DelayOperatorTest.java | 88 +++++++++- .../stram/plan/logical/LogicalPlanTest.java | 131 ++++++++++----- 6 files changed, 358 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 6233697..a687a37 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -35,6 +35,7 @@ import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Predicate; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -159,6 +160,7 @@ public class StreamingContainerManager implements PlanContext private long lastResourceRequest = 0; private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<String, StreamingContainerAgent>(); private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<Pair<PTOperator, Long>>(); + private Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups; private final Map<Long, Set<PTOperator>> shutdownOperators = new HashMap<>(); private CriticalPathInfo criticalPathInfo; private final ConcurrentMap<PTOperator, PTOperator> reportStats = Maps.newConcurrentMap(); @@ -812,6 +814,7 @@ public class StreamingContainerManager implements PlanContext Collection<OperatorMeta> logicalOperators = getLogicalPlan().getAllOperators(); //for backward compatibility for (OperatorMeta operatorMeta : logicalOperators) { + @SuppressWarnings("deprecation") Context.CountersAggregator aggregator = operatorMeta.getValue(OperatorContext.COUNTERS_AGGREGATOR); if (aggregator == null) { continue; @@ -825,6 +828,7 @@ public class StreamingContainerManager implements PlanContext } } if (counters.size() > 0) { + @SuppressWarnings("deprecation") Object aggregate = aggregator.aggregate(counters); latestLogicalCounters.put(operatorMeta.getName(), aggregate); } @@ -857,6 +861,8 @@ public class StreamingContainerManager implements PlanContext if (windowMetrics == null) { windowMetrics = new LinkedBlockingQueue<Pair<Long, Map<String, Object>>>(METRIC_QUEUE_SIZE) { + private static final long serialVersionUID = 1L; + @Override public boolean add(Pair<Long, Map<String, Object>> longMapPair) { @@ -1134,7 +1140,7 @@ public class StreamingContainerManager implements PlanContext cs.container.setAllocatedVCores(0); // resolve dependencies - UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock); + UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, getCheckpointGroups()); for (PTOperator oper : cs.container.getOperators()) { updateRecoveryCheckpoints(oper, ctx); } @@ -1881,31 +1887,18 @@ public class StreamingContainerManager implements PlanContext public final Set<PTOperator> blocked = new LinkedHashSet<PTOperator>(); public final long currentTms; public final boolean recovery; + public final Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups; public UpdateCheckpointsContext(Clock clock) { - this.currentTms = clock.getTime(); - this.recovery = false; + this(clock, false, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap()); } - public UpdateCheckpointsContext(Clock clock, boolean recovery) + public UpdateCheckpointsContext(Clock clock, boolean recovery, Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups) { this.currentTms = clock.getTime(); this.recovery = recovery; - } - - } - - private void addVisited(PTOperator operator, UpdateCheckpointsContext ctx) - { - ctx.visited.add(operator); - for (PTOperator.PTOutput out : operator.getOutputs()) { - for (PTOperator.PTInput sink : out.sinks) { - PTOperator sinkOperator = sink.target; - if (!ctx.visited.contains(sinkOperator)) { - addVisited(sinkOperator, ctx); - } - } + this.checkpointGroups = checkpointGroups; } } @@ -1933,20 +1926,55 @@ public class StreamingContainerManager implements PlanContext } } - long maxCheckpoint = operator.getRecentCheckpoint().windowId; - if (ctx.recovery && maxCheckpoint == Stateless.WINDOW_ID && operator.isOperatorStateLess()) { - long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS)); - maxCheckpoint = currentWindowId; + // the most recent checkpoint eligible for recovery based on downstream state + Checkpoint maxCheckpoint = Checkpoint.INITIAL_CHECKPOINT; + + Set<OperatorMeta> checkpointGroup = ctx.checkpointGroups.get(operator.getOperatorMeta()); + if (checkpointGroup == null) { + checkpointGroup = Collections.singleton(operator.getOperatorMeta()); + } + // find intersection of checkpoints that group can collectively move to + TreeSet<Checkpoint> commonCheckpoints = new TreeSet<>(new Checkpoint.CheckpointComparator()); + synchronized (operator.checkpoints) { + commonCheckpoints.addAll(operator.checkpoints); + } + Set<PTOperator> groupOpers = new HashSet<>(checkpointGroup.size()); + if (checkpointGroup.size() > 1) { + for (OperatorMeta om : checkpointGroup) { + Collection<PTOperator> operators = plan.getAllOperators(om); + for (PTOperator groupOper : operators) { + synchronized (groupOper.checkpoints) { + commonCheckpoints.retainAll(groupOper.checkpoints); + } + // visit all downstream operators of the group + ctx.visited.add(groupOper); + groupOpers.add(groupOper); + } + } + // highest common checkpoint + if (!commonCheckpoints.isEmpty()) { + maxCheckpoint = commonCheckpoints.last(); + } + } else { + // without logical grouping, treat partitions as independent + // this is especially important for parallel partitioning + ctx.visited.add(operator); + groupOpers.add(operator); + maxCheckpoint = operator.getRecentCheckpoint(); + if (ctx.recovery && maxCheckpoint.windowId == Stateless.WINDOW_ID && operator.isOperatorStateLess()) { + long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS)); + maxCheckpoint = new Checkpoint(currentWindowId, 0, 0); + } } - ctx.visited.add(operator); // DFS downstream operators - if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) { - addVisited(operator, ctx); - } else { - for (PTOperator.PTOutput out : operator.getOutputs()) { + for (PTOperator groupOper : groupOpers) { + for (PTOperator.PTOutput out : groupOper.getOutputs()) { for (PTOperator.PTInput sink : out.sinks) { PTOperator sinkOperator = sink.target; + if (groupOpers.contains(sinkOperator)) { + continue; // downstream operator within group + } if (!ctx.visited.contains(sinkOperator)) { // downstream traversal updateRecoveryCheckpoints(sinkOperator, ctx); @@ -1954,7 +1982,7 @@ public class StreamingContainerManager implements PlanContext // recovery window id cannot move backwards // when dynamically adding new operators if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) { - maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId); + maxCheckpoint = Checkpoint.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint()); } if (ctx.blocked.contains(sinkOperator)) { @@ -1967,33 +1995,43 @@ public class StreamingContainerManager implements PlanContext } } - // checkpoint frozen during deployment - if (ctx.recovery || operator.getState() != PTOperator.State.PENDING_DEPLOY) { - // remove previous checkpoints - Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT; - synchronized (operator.checkpoints) { - if (!operator.checkpoints.isEmpty() && (operator.checkpoints.getFirst()).windowId <= maxCheckpoint) { - c1 = operator.checkpoints.getFirst(); - Checkpoint c2; - while (operator.checkpoints.size() > 1 && ((c2 = operator.checkpoints.get(1)).windowId) <= maxCheckpoint) { - operator.checkpoints.removeFirst(); - //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1); - this.purgeCheckpoints.add(new Pair<PTOperator, Long>(operator, c1.windowId)); - c1 = c2; + // find the common checkpoint that is <= downstream recovery checkpoint + if (!commonCheckpoints.contains(maxCheckpoint)) { + if (!commonCheckpoints.isEmpty()) { + maxCheckpoint = Objects.firstNonNull(commonCheckpoints.floor(maxCheckpoint), maxCheckpoint); + } + } + + for (PTOperator groupOper : groupOpers) { + // checkpoint frozen during deployment + if (ctx.recovery || groupOper.getState() != PTOperator.State.PENDING_DEPLOY) { + // remove previous checkpoints + Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT; + LinkedList<Checkpoint> checkpoints = groupOper.checkpoints; + synchronized (checkpoints) { + if (!checkpoints.isEmpty() && (checkpoints.getFirst()).windowId <= maxCheckpoint.windowId) { + c1 = checkpoints.getFirst(); + Checkpoint c2; + while (checkpoints.size() > 1 && ((c2 = checkpoints.get(1)).windowId) <= maxCheckpoint.windowId) { + checkpoints.removeFirst(); + //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1); + this.purgeCheckpoints.add(new Pair<PTOperator, Long>(groupOper, c1.windowId)); + c1 = c2; + } } - } - else { - if (ctx.recovery && operator.checkpoints.isEmpty() && operator.isOperatorStateLess()) { - LOG.debug("Adding checkpoint for stateless operator {} {}", operator, Codec.getStringWindowId(maxCheckpoint)); - c1 = operator.addCheckpoint(maxCheckpoint, this.vars.windowStartMillis); + else { + if (ctx.recovery && checkpoints.isEmpty() && groupOper.isOperatorStateLess()) { + LOG.debug("Adding checkpoint for stateless operator {} {}", groupOper, Codec.getStringWindowId(maxCheckpoint.windowId)); + c1 = groupOper.addCheckpoint(maxCheckpoint.windowId, this.vars.windowStartMillis); + } } } + //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints}); + groupOper.setRecoveryCheckpoint(c1); + } + else { + LOG.debug("Skipping checkpoint update {} during {}", groupOper, groupOper.getState()); } - //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints}); - operator.setRecoveryCheckpoint(c1); - } - else { - LOG.debug("Skipping checkpoint update {} during {}", operator, operator.getState()); } } @@ -2009,13 +2047,32 @@ public class StreamingContainerManager implements PlanContext return this.vars.windowStartMillis; } + private Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups() + { + if (this.checkpointGroups == null) { + this.checkpointGroups = new HashMap<>(); + LogicalPlan dag = this.plan.getLogicalPlan(); + dag.resetNIndex(); + LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext(); + for (OperatorMeta om : dag.getRootOperators()) { + this.plan.getLogicalPlan().findStronglyConnected(om, vc); + } + for (Set<OperatorMeta> checkpointGroup : vc.stronglyConnected) { + for (OperatorMeta om : checkpointGroup) { + this.checkpointGroups.put(om, checkpointGroup); + } + } + } + return checkpointGroups; + } + /** * Visit all operators to update current checkpoint based on updated downstream state. * Purge older checkpoints that are no longer needed. */ private long updateCheckpoints(boolean recovery) { - UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery); + UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery, getCheckpointGroups()); for (OperatorMeta logicalOperator : plan.getLogicalPlan().getRootOperators()) { //LOG.debug("Updating checkpoints for operator {}", logicalOperator.getName()); List<PTOperator> operators = plan.getOperators(logicalOperator); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java b/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java index 5ec4a7e..d24b17a 100644 --- a/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java +++ b/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java @@ -18,6 +18,8 @@ */ package com.datatorrent.stram.api; +import java.util.Comparator; + import com.datatorrent.api.annotation.Stateless; import com.datatorrent.bufferserver.util.Codec; @@ -102,6 +104,15 @@ public class Checkpoint implements com.datatorrent.api.Stats.Checkpoint return windowId; } + public static class CheckpointComparator implements Comparator<Checkpoint> + { + @Override + public int compare(Checkpoint o1, Checkpoint o2) + { + return Long.compare(o1.windowId, o2.windowId); + } + } + @SuppressWarnings("FieldNameHidesFieldInSuperclass") public static final Checkpoint INITIAL_CHECKPOINT = new Checkpoint(Stateless.WINDOW_ID, 0, 0); private static final long serialVersionUID = 201402152116L; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 883ad71..6d7ebe1 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -159,8 +159,6 @@ public class LogicalPlan implements Serializable, DAG public final Map<String, ModuleMeta> modules = new LinkedHashMap<>(); private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>(); private final Attribute.AttributeMap attributes = new DefaultAttributeMap(); - private transient int nodeIndex = 0; // used for cycle validation - private transient Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); // used for cycle validation private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>(); @Override @@ -1540,6 +1538,7 @@ public class LogicalPlan implements Serializable, DAG return this.operators.get(operatorName); } + @Override public ModuleMeta getModuleMeta(String moduleName) { return this.modules.get(moduleName); @@ -1557,6 +1556,7 @@ public class LogicalPlan implements Serializable, DAG throw new IllegalArgumentException("Operator not associated with the DAG: " + operator); } + @Override public ModuleMeta getMeta(Module module) { for (ModuleMeta m : getAllModules()) { @@ -1626,6 +1626,24 @@ public class LogicalPlan implements Serializable, DAG return classNames; } + public static class ValidationContext + { + public int nodeIndex = 0; + public Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); + public Stack<OperatorMeta> path = new Stack<OperatorMeta>(); + public List<Set<OperatorMeta>> stronglyConnected = new ArrayList<>(); + public OperatorMeta invalidLoopAt; + public List<Set<OperatorMeta>> invalidCycles = new ArrayList<>(); + } + + public void resetNIndex() + { + for (OperatorMeta om : getAllOperators()) { + om.lowlink = null; + om.nindex = null; + } + } + /** * Validate the plan. Includes checks that required ports are connected, * required configuration parameters specified, graph free of cycles etc. @@ -1752,21 +1770,20 @@ public class LogicalPlan implements Serializable, DAG throw new ValidationException("At least one output port must be connected: " + n.name); } } - stack = new Stack<OperatorMeta>(); - List<List<String>> cycles = new ArrayList<List<String>>(); + ValidationContext validatonContext = new ValidationContext(); for (OperatorMeta n: operators.values()) { if (n.nindex == null) { - findStronglyConnected(n, cycles); + findStronglyConnected(n, validatonContext); } } - if (!cycles.isEmpty()) { - throw new ValidationException("Loops in graph: " + cycles); + if (!validatonContext.invalidCycles.isEmpty()) { + throw new ValidationException("Loops in graph: " + validatonContext.invalidCycles); } List<List<String>> invalidDelays = new ArrayList<>(); for (OperatorMeta n : rootOperators) { - findInvalidDelays(n, invalidDelays); + findInvalidDelays(n, invalidDelays, new Stack<OperatorMeta>()); } if (!invalidDelays.isEmpty()) { throw new ValidationException("Invalid delays in graph: " + invalidDelays); @@ -1908,59 +1925,72 @@ public class LogicalPlan implements Serializable, DAG * @param om * @param cycles */ - public void findStronglyConnected(OperatorMeta om, List<List<String>> cycles) + public void findStronglyConnected(OperatorMeta om, ValidationContext ctx) { - om.nindex = nodeIndex; - om.lowlink = nodeIndex; - nodeIndex++; - stack.push(om); + om.nindex = ctx.nodeIndex; + om.lowlink = ctx.nodeIndex; + ctx.nodeIndex++; + ctx.stack.push(om); + ctx.path.push(om); // depth first successors traversal for (StreamMeta downStream: om.outputStreams.values()) { for (InputPortMeta sink: downStream.sinks) { - if (om.getOperator() instanceof Operator.DelayOperator) { - // this is an iteration loop, do not treat it as downstream when detecting cycles - sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true); - continue; - } OperatorMeta successor = sink.getOperatorWrapper(); if (successor == null) { continue; } // check for self referencing node if (om == successor) { - cycles.add(Collections.singletonList(om.name)); + ctx.invalidCycles.add(Collections.singleton(om)); } if (successor.nindex == null) { // not visited yet - findStronglyConnected(successor, cycles); + findStronglyConnected(successor, ctx); om.lowlink = Math.min(om.lowlink, successor.lowlink); } - else if (stack.contains(successor)) { + else if (ctx.stack.contains(successor)) { om.lowlink = Math.min(om.lowlink, successor.nindex); + boolean isDelayLoop = false; + for (int i=ctx.path.size(); i>0; i--) { + OperatorMeta om2 = ctx.path.get(i-1); + if (om2.getOperator() instanceof Operator.DelayOperator) { + isDelayLoop = true; + } + if (om2 == successor) { + break; + } + } + if (!isDelayLoop) { + ctx.invalidLoopAt = successor; + } } } } // pop stack for all root operators if (om.lowlink.equals(om.nindex)) { - List<String> connectedIds = new ArrayList<String>(); - while (!stack.isEmpty()) { - OperatorMeta n2 = stack.pop(); - connectedIds.add(n2.name); + Set<OperatorMeta> connectedSet = new LinkedHashSet<>(ctx.stack.size()); + while (!ctx.stack.isEmpty()) { + OperatorMeta n2 = ctx.stack.pop(); + connectedSet.add(n2); if (n2 == om) { break; // collected all connected operators } } // strongly connected (cycle) if more than one node in stack - if (connectedIds.size() > 1) { - LOG.debug("detected cycle from node {}: {}", om.name, connectedIds); - cycles.add(connectedIds); + if (connectedSet.size() > 1) { + ctx.stronglyConnected.add(connectedSet); + if (connectedSet.contains(ctx.invalidLoopAt)) { + ctx.invalidCycles.add(connectedSet); + } } } + ctx.path.pop(); + } - public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays) + public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays, Stack<OperatorMeta> stack) { stack.push(om); @@ -1977,6 +2007,7 @@ public class LogicalPlan implements Serializable, DAG for (InputPortMeta sink : downStream.sinks) { OperatorMeta successor = sink.getOperatorWrapper(); if (isDelayOperator) { + sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true); // Check whether all downstream operators are already visited in the path if (successor != null && !stack.contains(successor)) { LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}", @@ -1984,7 +2015,7 @@ public class LogicalPlan implements Serializable, DAG invalidDelays.add(Arrays.asList(om.getName(), successor.getName())); } } else { - findInvalidDelays(successor, invalidDelays); + findInvalidDelays(successor, invalidDelays, stack); } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index ee3cbc3..5675b53 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -56,6 +56,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHea import com.datatorrent.stram.engine.GenericTestOperator; import com.datatorrent.stram.engine.OperatorContext; import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.physical.PTContainer; import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.plan.physical.PhysicalPlan; @@ -314,7 +315,7 @@ public class CheckpointTest o4p1.checkpoints.add(leafCheckpoint); UpdateCheckpointsContext ctx; - dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true)); + dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap())); Assert.assertEquals("initial checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint()); Assert.assertEquals("initial checkpoint " + o2SLp1, leafCheckpoint, o2SLp1.getRecoveryCheckpoint()); Assert.assertEquals("initial checkpoint " + o3SLp1, new Checkpoint(clock.getTime(), 0, 0), o3SLp1.getRecoveryCheckpoint()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java index 359da17..06f184f 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java @@ -20,7 +20,11 @@ package com.datatorrent.stram.plan.logical; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; @@ -32,8 +36,14 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; + +import com.google.common.collect.Sets; + import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; @@ -42,8 +52,17 @@ import com.datatorrent.api.Operator; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.DefaultDelayOperator; import com.datatorrent.stram.StramLocalCluster; +import com.datatorrent.stram.StreamingContainerManager; +import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext; +import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.engine.GenericTestOperator; import com.datatorrent.stram.engine.TestGeneratorInputOperator; +import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; +import com.datatorrent.stram.plan.physical.PTOperator; +import com.datatorrent.stram.plan.physical.PhysicalPlan; +import com.datatorrent.stram.support.StramTestSupport; +import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent; +import com.datatorrent.stram.support.StramTestSupport.TestMeta; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -75,7 +94,7 @@ public class DelayOperatorTest GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class); GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class); GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class); - DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); + DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); dag.addStream("BtoC", opB.outport1, opC.inport1); dag.addStream("CtoD", opC.outport1, opD.inport1); @@ -83,7 +102,7 @@ public class DelayOperatorTest dag.addStream("DelayToD", opDelay.output, opD.inport2); List<List<String>> invalidDelays = new ArrayList<>(); - dag.findInvalidDelays(dag.getMeta(opB), invalidDelays); + dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>()); assertEquals("operator invalid delay", 1, invalidDelays.size()); try { @@ -106,7 +125,7 @@ public class DelayOperatorTest dag.addStream("DelayToC", opDelay.output, opC.inport2); invalidDelays = new ArrayList<>(); - dag.findInvalidDelays(dag.getMeta(opB), invalidDelays); + dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>()); assertEquals("operator invalid delay", 1, invalidDelays.size()); try { @@ -373,5 +392,68 @@ public class DelayOperatorTest Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20)); } + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testCheckpointUpdate() + { + LogicalPlan dag = StramTestSupport.createDAG(testMeta); + + TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class); + GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class); + GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class); + GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class); + DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>()); + + dag.addStream("AtoB", opA.outport, opB.inport1); + dag.addStream("BtoC", opB.outport1, opC.inport1); + dag.addStream("CtoD", opC.outport1, opD.inport1); + dag.addStream("CtoDelay", opC.outport2, opDelay.input); + dag.addStream("DelayToB", opDelay.output, opB.inport2); + dag.validate(); + + dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); + StreamingContainerManager scm = new StreamingContainerManager(dag); + PhysicalPlan plan = scm.getPhysicalPlan(); + // set all operators as active to enable recovery window id update + for (PTOperator oper : plan.getAllOperators().values()) { + oper.setState(PTOperator.State.ACTIVE); + } + + Clock clock = new SystemClock(); + + PTOperator opA1 = plan.getOperators(dag.getMeta(opA)).get(0); + PTOperator opB1 = plan.getOperators(dag.getMeta(opB)).get(0); + PTOperator opC1 = plan.getOperators(dag.getMeta(opC)).get(0); + PTOperator opDelay1 = plan.getOperators(dag.getMeta(opDelay)).get(0); + PTOperator opD1 = plan.getOperators(dag.getMeta(opD)).get(0); + + Checkpoint cp3 = new Checkpoint(3L, 0, 0); + Checkpoint cp5 = new Checkpoint(5L, 0, 0); + Checkpoint cp4 = new Checkpoint(4L, 0, 0); + + opB1.checkpoints.add(cp3); + opC1.checkpoints.add(cp3); + opC1.checkpoints.add(cp4); + opDelay1.checkpoints.add(cp3); + opDelay1.checkpoints.add(cp5); + opD1.checkpoints.add(cp5); + // construct grouping that would be supplied through LogicalPlan + Set<OperatorMeta> stronglyConnected = Sets.newHashSet(dag.getMeta(opB), dag.getMeta(opC), dag.getMeta(opDelay)); + Map<OperatorMeta, Set<OperatorMeta>> groups = new HashMap<>(); + for (OperatorMeta om : stronglyConnected) { + groups.put(om, stronglyConnected); + } + + UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, groups); + scm.updateRecoveryCheckpoints(opB1, ctx); + + Assert.assertEquals("checkpoint " + opA1, Checkpoint.INITIAL_CHECKPOINT, opA1.getRecoveryCheckpoint()); + Assert.assertEquals("checkpoint " + opB1, cp3, opC1.getRecoveryCheckpoint()); + Assert.assertEquals("checkpoint " + opC1, cp3, opC1.getRecoveryCheckpoint()); + Assert.assertEquals("checkpoint " + opD1, cp5, opD1.getRecoveryCheckpoint()); + + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java index a4ac488..9383f12 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java @@ -19,6 +19,7 @@ package com.datatorrent.stram.plan.logical; import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.common.util.DefaultDelayOperator; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -35,6 +36,7 @@ import javax.validation.constraints.Pattern; import com.esotericsoftware.kryo.DefaultSerializer; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; @@ -43,7 +45,6 @@ import static org.junit.Assert.*; import com.datatorrent.common.partitioner.StatelessPartitioner; import com.datatorrent.api.*; -import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG.Locality; @@ -61,10 +62,12 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent; import com.datatorrent.stram.support.StramTestSupport.RegexMatcher; -public class LogicalPlanTest { +public class LogicalPlanTest +{ @Test - public void testCycleDetection() { + public void testCycleDetection() + { LogicalPlan dag = new LogicalPlan(); //NodeConf operator1 = b.getOrAddNode("operator1"); @@ -91,20 +94,20 @@ public class LogicalPlanTest { // expected, stream can have single input/output only } - List<List<String>> cycles = new ArrayList<List<String>>(); - dag.findStronglyConnected(dag.getMeta(operator7), cycles); - assertEquals("operator self reference", 1, cycles.size()); - assertEquals("operator self reference", 1, cycles.get(0).size()); - assertEquals("operator self reference", dag.getMeta(operator7).getName(), cycles.get(0).get(0)); + LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext(); + dag.findStronglyConnected(dag.getMeta(operator7), vc); + assertEquals("operator self reference", 1, vc.invalidCycles.size()); + assertEquals("operator self reference", 1, vc.invalidCycles.get(0).size()); + assertEquals("operator self reference", dag.getMeta(operator7), vc.invalidCycles.get(0).iterator().next()); // 3 operator cycle - cycles.clear(); - dag.findStronglyConnected(dag.getMeta(operator4), cycles); - assertEquals("3 operator cycle", 1, cycles.size()); - assertEquals("3 operator cycle", 3, cycles.get(0).size()); - assertTrue("operator2", cycles.get(0).contains(dag.getMeta(operator2).getName())); - assertTrue("operator3", cycles.get(0).contains(dag.getMeta(operator3).getName())); - assertTrue("operator4", cycles.get(0).contains(dag.getMeta(operator4).getName())); + vc = new LogicalPlan.ValidationContext(); + dag.findStronglyConnected(dag.getMeta(operator4), vc); + assertEquals("3 operator cycle", 1, vc.invalidCycles.size()); + assertEquals("3 operator cycle", 3, vc.invalidCycles.get(0).size()); + assertTrue("operator2", vc.invalidCycles.get(0).contains(dag.getMeta(operator2))); + assertTrue("operator3", vc.invalidCycles.get(0).contains(dag.getMeta(operator3))); + assertTrue("operator4", vc.invalidCycles.get(0).contains(dag.getMeta(operator4))); try { dag.validate(); @@ -115,13 +118,44 @@ public class LogicalPlanTest { } - public static class ValidationOperator extends BaseOperator { + @Test + public void testCycleDetectionWithDelay() + { + LogicalPlan dag = new LogicalPlan(); + + TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class); + GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class); + GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class); + GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class); + DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>()); + DefaultDelayOperator<Object> opDelay2 = dag.addOperator("opDelay2", new DefaultDelayOperator<>()); + + dag.addStream("AtoB", opA.outport, opB.inport1); + dag.addStream("BtoC", opB.outport1, opC.inport1); + dag.addStream("CtoD", opC.outport1, opD.inport1); + dag.addStream("CtoDelay", opC.outport2, opDelay.input); + dag.addStream("DtoDelay", opD.outport1, opDelay2.input); + dag.addStream("DelayToB", opDelay.output, opB.inport2); + dag.addStream("Delay2ToC", opDelay2.output, opC.inport2); + + LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext(); + dag.findStronglyConnected(dag.getMeta(opA), vc); + + Assert.assertEquals("No invalid cycle", Collections.emptyList(), vc.invalidCycles); + Set<OperatorMeta> exp = Sets.newHashSet(dag.getMeta(opDelay2), dag.getMeta(opDelay), dag.getMeta(opC), dag.getMeta(opB), dag.getMeta(opD)); + Assert.assertEquals("cycle", exp, vc.stronglyConnected.get(0)); + } + + + public static class ValidationOperator extends BaseOperator + { public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>(); public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>(); } - public static class CounterOperator extends BaseOperator { + public static class CounterOperator extends BaseOperator + { final public transient InputPort<Object> countInputPort = new DefaultInputPort<Object>() { @Override final public void process(Object payload) { @@ -130,8 +164,8 @@ public class LogicalPlanTest { } @Test - public void testLogicalPlanSerialization() throws Exception { - + public void testLogicalPlanSerialization() throws Exception + { LogicalPlan dag = new LogicalPlan(); dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); @@ -188,7 +222,8 @@ public class LogicalPlanTest { Assert.assertEquals("", 2, dag.getAllOperators().size()); } - public static class ValidationTestOperator extends BaseOperator implements InputOperator { + public static class ValidationTestOperator extends BaseOperator implements InputOperator + { @NotNull @Pattern(regexp=".*malhar.*", message="Value has to contain 'malhar'!") private String stringField1; @@ -271,8 +306,8 @@ public class LogicalPlanTest { } @Test - public void testOperatorValidation() { - + public void testOperatorValidation() + { ValidationTestOperator bean = new ValidationTestOperator(); bean.stringField1 = "malhar1"; bean.intField1 = 1; @@ -348,7 +383,8 @@ public class LogicalPlanTest { } @OperatorAnnotation(partitionable = false) - public static class TestOperatorAnnotationOperator extends BaseOperator { + public static class TestOperatorAnnotationOperator extends BaseOperator + { @InputPortFieldAnnotation( optional = true) final public transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>() { @@ -358,11 +394,13 @@ public class LogicalPlanTest { }; } - class NoInputPortOperator extends BaseOperator { + class NoInputPortOperator extends BaseOperator + { } @Test - public void testValidationForNonInputRootOperator() { + public void testValidationForNonInputRootOperator() + { LogicalPlan dag = new LogicalPlan(); NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator()); try { @@ -374,8 +412,8 @@ public class LogicalPlanTest { } @OperatorAnnotation(partitionable = false) - public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2> { - + public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2> + { @Override public Collection<Partition<TestOperatorAnnotationOperator2>> definePartitions(Collection<Partition<TestOperatorAnnotationOperator2>> partitions, PartitioningContext context) { @@ -389,7 +427,8 @@ public class LogicalPlanTest { } @Test - public void testOperatorAnnotation() { + public void testOperatorAnnotation() + { LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class); TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class); @@ -430,8 +469,8 @@ public class LogicalPlanTest { } @Test - public void testPortConnectionValidation() { - + public void testPortConnectionValidation() + { LogicalPlan dag = new LogicalPlan(); TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class); @@ -459,7 +498,8 @@ public class LogicalPlanTest { } @Test - public void testAtMostOnceProcessingModeValidation() { + public void testAtMostOnceProcessingModeValidation() + { LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); @@ -489,8 +529,9 @@ public class LogicalPlanTest { } - @Test - public void testExactlyOnceProcessingModeValidation() { + @Test + public void testExactlyOnceProcessingModeValidation() + { LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); @@ -527,7 +568,8 @@ public class LogicalPlanTest { } @Test - public void testLocalityValidation() { + public void testLocalityValidation() + { LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); @@ -549,7 +591,8 @@ public class LogicalPlanTest { dag.validate(); } - private class TestAnnotationsOperator extends BaseOperator implements InputOperator { + private class TestAnnotationsOperator extends BaseOperator implements InputOperator + { //final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>(); @OutputPortFieldAnnotation( optional=false) @@ -562,7 +605,8 @@ public class LogicalPlanTest { } } - private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator{ + private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator + { // multiple ports w/o annotation, one of them must be connected final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>(); @@ -573,7 +617,8 @@ public class LogicalPlanTest { } } - private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator{ + private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator + { // multiple ports w/o annotation, one of them must be connected @OutputPortFieldAnnotation( optional=true) final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>(); @@ -587,7 +632,8 @@ public class LogicalPlanTest { } @Test - public void testOutputPortAnnotation() { + public void testOutputPortAnnotation() + { LogicalPlan dag = new LogicalPlan(); TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator()); @@ -623,7 +669,8 @@ public class LogicalPlanTest { * Operator that can be used with default Java serialization instead of Kryo */ @DefaultSerializer(JavaSerializer.class) - public static class JdkSerializableOperator extends BaseOperator implements Serializable { + public static class JdkSerializableOperator extends BaseOperator implements Serializable + { private static final long serialVersionUID = -4024202339520027097L; public abstract class SerializableInputPort<T> implements InputPort<T>, Sink<T>, java.io.Serializable { @@ -673,7 +720,8 @@ public class LogicalPlanTest { } @Test - public void testJdkSerializableOperator() throws Exception { + public void testJdkSerializableOperator() throws Exception + { LogicalPlan dag = new LogicalPlan(); dag.addOperator("o1", new JdkSerializableOperator()); @@ -785,7 +833,8 @@ public class LogicalPlanTest { } } - public static class TestPortCodecOperator extends BaseOperator { + public static class TestPortCodecOperator extends BaseOperator + { public transient final DefaultInputPort<Object> inport1 = new DefaultInputPort<Object>() { @Override
