Repository: apex-core Updated Branches: refs/heads/master 3119aba89 -> 8ae80fee1
APEXCORE-494 Fix scale up from single partition when partition remains unchanged. close #362 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8ae80fee Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8ae80fee Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8ae80fee Branch: refs/heads/master Commit: 8ae80fee1400c37fa3ad0dd891a4ddb24eb74d21 Parents: 3119aba Author: Thomas Weise <[email protected]> Authored: Wed Aug 10 22:11:56 2016 -0700 Committer: Thomas Weise <[email protected]> Committed: Tue Aug 16 16:39:34 2016 -0700 ---------------------------------------------------------------------- .../stram/plan/physical/PhysicalPlan.java | 7 +- .../stram/plan/physical/StreamMapping.java | 12 ++ .../stram/plan/physical/PhysicalPlanTest.java | 121 +++++++++++-------- 3 files changed, 84 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ae80fee/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index a8b2e23..11df7ba 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -435,7 +435,7 @@ public class PhysicalPlan implements Serializable updatePartitionsInfoForPersistOperator(dag); Map<PTOperator, PTContainer> operatorContainerMap = new HashMap<>(); - + // assign operators to containers int groupCount = 0; Set<PTOperator> deployOperators = Sets.newHashSet(); @@ -463,7 +463,7 @@ public class PhysicalPlan implements Serializable } } - + for (PTContainer container : containers) { updateContainerMemoryWithBufferServer(container); container.setRequiredVCores(getVCores(container.getOperators())); @@ -490,7 +490,7 @@ public class PhysicalPlan implements Serializable LOG.debug("Container with operators [{}] has anti affinity with [{}]", StringUtils.join(containerOperators, ","), StringUtils.join(antiOperators, ",")); } } - + for (Map.Entry<PTOperator, Operator> operEntry : this.newOpers.entrySet()) { initCheckpoint(operEntry.getKey(), operEntry.getValue(), Checkpoint.INITIAL_CHECKPOINT); } @@ -1135,7 +1135,6 @@ public class PhysicalPlan implements Serializable //make sure all the new operators are included in deploy operator list this.deployOpers.addAll(this.newOpers.keySet()); - ctx.deploy(releaseContainers, this.undeployOpers, newContainers, deployOperators); this.newOpers.clear(); this.deployOpers.clear(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ae80fee/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java index e404d5a..b5e357e 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java @@ -19,6 +19,7 @@ package com.datatorrent.stram.plan.physical; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -281,6 +282,17 @@ public class StreamMapping implements java.io.Serializable (sourceSingleFinal != null ? sourceSingleFinal.booleanValue() : PortContext.UNIFIER_SINGLE_FINAL.defaultValue); if (upstream.size() > 1) { + // detach downstream from upstream operator for the case where no unifier existed previously + for (PTOutput source : upstream) { + Iterator<PTInput> sinks = source.sinks.iterator(); + while (sinks.hasNext()) { + PTInput sink = sinks.next(); + if (sink.target == doperEntry.first) { + doperEntry.first.inputs.remove(sink); + sinks.remove(); + } + } + } if (!separateUnifiers && ((pks == null || pks.mask == 0) || lastSingle)) { if (finalUnifier == null) { finalUnifier = createUnifier(streamMeta, plan); http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ae80fee/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java index 99dee31..8d19640 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java @@ -378,8 +378,6 @@ public class PhysicalPlanTest dag.addStream("o1.outport1", o1.outport1, o2.inport1, o2.inport2); dag.addStream("mergeStream", o2.outport1, mergeNode.inport1); - dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2); - OperatorMeta o2Meta = dag.getMeta(o2); o2Meta.getAttributes().put(OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitionLoadWatch(0, 5))); @@ -389,12 +387,11 @@ public class PhysicalPlanTest dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx); PhysicalPlan plan = new PhysicalPlan(dag, ctx); - Assert.assertEquals("number of containers", 2, plan.getContainers().size()); Assert.assertEquals("number of operators", 3, plan.getAllOperators().size()); Assert.assertEquals("number of save requests", 3, ctx.backupRequests); List<PTOperator> o2Partitions = plan.getOperators(o2Meta); - Assert.assertEquals("number operators " + o2Meta, 1, o2Partitions.size()); + Assert.assertEquals("partition count " + o2Meta, 1, o2Partitions.size()); PTOperator o2p1 = o2Partitions.get(0); Assert.assertEquals("stats handlers " + o2p1, 1, o2p1.statsListeners.size()); @@ -404,38 +401,40 @@ public class PhysicalPlanTest setThroughput(o2p1, 10); plan.onStatusUpdate(o2p1); - Assert.assertEquals("load exceeds max", 1, ctx.events.size()); + Assert.assertEquals("partitioning triggered", 1, ctx.events.size()); ctx.backupRequests = 0; ctx.events.remove(0).run(); - List<PTOperator> n2Instances = plan.getOperators(o2Meta); - Assert.assertEquals("partition instances " + n2Instances, 2, n2Instances.size()); - PTOperator po = n2Instances.get(0); - PTOperator po2 = n2Instances.get(1); + o2Partitions = plan.getOperators(o2Meta); + Assert.assertEquals("partition count " + o2Partitions, 2, o2Partitions.size()); + o2p1 = o2Partitions.get(0); + Assert.assertEquals("sinks " + o2p1.getOutputs(), 1, o2p1.getOutputs().size()); + PTOperator o2p2 = o2Partitions.get(1); + Assert.assertEquals("sinks " + o2p2.getOutputs(), 1, o2p2.getOutputs().size()); Set<PTOperator> expUndeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode))); - expUndeploy.add(po); + expUndeploy.add(o2p1); expUndeploy.addAll(plan.getMergeOperators(o2Meta)); // verify load update generates expected events per configuration - setThroughput(po, 0); - plan.onStatusUpdate(po); + setThroughput(o2p1, 0); + plan.onStatusUpdate(o2p1); Assert.assertEquals("load min", 0, ctx.events.size()); - setThroughput(po, 3); - plan.onStatusUpdate(po); + setThroughput(o2p1, 3); + plan.onStatusUpdate(o2p1); Assert.assertEquals("load within range", 0, ctx.events.size()); - setThroughput(po, 10); - plan.onStatusUpdate(po); + setThroughput(o2p1, 10); + plan.onStatusUpdate(o2p1); Assert.assertEquals("load exceeds max", 1, ctx.events.size()); ctx.backupRequests = 0; ctx.events.remove(0).run(); Assert.assertEquals("new partitions", 3, plan.getOperators(o2Meta).size()); - Assert.assertTrue("", plan.getOperators(o2Meta).contains(po2)); + Assert.assertTrue("", plan.getOperators(o2Meta).contains(o2p2)); for (PTOperator partition : plan.getOperators(o2Meta)) { Assert.assertNotNull("container null " + partition, partition.getContainer()); @@ -446,17 +445,17 @@ public class PhysicalPlanTest Set<PTOperator> expDeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode))); expDeploy.addAll(plan.getOperators(o2Meta)); - expDeploy.remove(po2); + expDeploy.remove(o2p2); expDeploy.addAll(plan.getMergeOperators(o2Meta)); Assert.assertEquals("" + ctx.deploy, expDeploy, ctx.deploy); Assert.assertEquals("Count of storage requests", 2, ctx.backupRequests); // partitioning skipped on insufficient head room - po = plan.getOperators(o2Meta).get(0); + o2p1 = plan.getOperators(o2Meta).get(0); plan.setAvailableResources(0); - setThroughput(po, 10); - plan.onStatusUpdate(po); + setThroughput(o2p1, 10); + plan.onStatusUpdate(o2p1); Assert.assertEquals("not repartitioned", 1, ctx.events.size()); ctx.events.remove(0).run(); Assert.assertEquals("partition count unchanged", 3, plan.getOperators(o2Meta).size()); @@ -466,15 +465,20 @@ public class PhysicalPlanTest /** * Test partitioning of an input operator (no input port). * Cover aspects that are not part of generic operator test. + * Test scaling from one to multiple partitions with unifier when one partition remains unmodified. */ @Test public void testInputOperatorPartitioning() { LogicalPlan dag = new LogicalPlan(); - TestInputOperator<Object> o1 = dag.addOperator("o1", new TestInputOperator<>()); + final TestInputOperator<Object> o1 = dag.addOperator("o1", new TestInputOperator<>()); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + dag.addStream("o1.outport1", o1.output, o2.inport1); + OperatorMeta o1Meta = dag.getMeta(o1); dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); - dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestInputOperator<Object>>(2)); + TestPartitioner<TestInputOperator<Object>> partitioner = new TestPartitioner<TestInputOperator<Object>>(); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, partitioner); TestPlanContext ctx = new TestPlanContext(); dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx); @@ -482,7 +486,7 @@ public class PhysicalPlanTest Assert.assertEquals("number of containers", 2, plan.getContainers().size()); List<PTOperator> o1Partitions = plan.getOperators(o1Meta); - Assert.assertEquals("partition instances " + o1Partitions, 2, o1Partitions.size()); + Assert.assertEquals("partitions " + o1Partitions, 1, o1Partitions.size()); PTOperator o1p1 = o1Partitions.get(0); // verify load update generates expected events per configuration @@ -493,33 +497,34 @@ public class PhysicalPlanTest PartitioningTest.PartitionLoadWatch.put(o1p1, 1); plan.onStatusUpdate(o1p1); Assert.assertEquals("scale up triggered", 1, ctx.events.size()); - + // add another partition, keep existing as is + partitioner.extraPartitions.add(new DefaultPartition<TestInputOperator<Object>>(o1)); Runnable r = ctx.events.remove(0); r.run(); - Assert.assertEquals("operators after scale up", 3, plan.getOperators(o1Meta).size()); - for (PTOperator p : plan.getOperators(o1Meta)) { + partitioner.extraPartitions.clear(); + + o1Partitions = plan.getOperators(o1Meta); + Assert.assertEquals("operators after scale up", 2, o1Partitions.size()); + Assert.assertEquals("first partition unmodified", o1p1, o1Partitions.get(0)); + Assert.assertEquals("single output", 1, o1p1.getOutputs().size()); + Assert.assertEquals("output to unifier", 1, o1p1.getOutputs().get(0).sinks.size()); + + Set<PTOperator> expUndeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(o2))); + Set<PTOperator> expDeploy = Sets.newHashSet(o1Partitions.get(1)); + expDeploy.addAll(plan.getMergeOperators(dag.getMeta(o1))); + expDeploy.addAll(expUndeploy); + + Assert.assertEquals("undeploy", expUndeploy, ctx.undeploy); + Assert.assertEquals("deploy", expDeploy, ctx.deploy); + + for (PTOperator p : o1Partitions) { Assert.assertEquals("activation window id " + p, Checkpoint.INITIAL_CHECKPOINT, p.recoveryCheckpoint); Assert.assertEquals("checkpoints " + p + " " + p.checkpoints, Lists.newArrayList(), p.checkpoints); PartitioningTest.PartitionLoadWatch.put(p, -1); plan.onStatusUpdate(p); } ctx.events.remove(0).run(); - Assert.assertEquals("operators after scale down", 2, plan.getOperators(o1Meta).size()); -/* - // ensure scale up maintains min checkpoint - long checkpoint=1; - for (PTOperator p : plan.getOperators(o1Meta)) { - p.checkpoints.add(checkpoint); - p.setRecoveryCheckpoint(checkpoint); - PartitioningTest.PartitionLoadWatch.loadIndicators.put(p.getId(), 1); - plan.onStatusUpdate(p); - } - ctx.events.remove(0).run(); - Assert.assertEquals("operators after scale up (2)", 4, plan.getOperators(o1Meta).size()); - for (PTOperator p : plan.getOperators(o1Meta)) { - Assert.assertEquals("checkpoints " + p.checkpoints, p.checkpoints.size(), 1); - } -*/ + Assert.assertEquals("operators after scale down", 1, plan.getOperators(o1Meta).size()); } @Test @@ -773,7 +778,6 @@ public class PhysicalPlanTest Assert.assertTrue("" + expectedPartitionKeys, expectedPartitionKeys.isEmpty()); // partition merge - @SuppressWarnings("unchecked") List<HashSet<PartitionKeys>> expectedKeysSets = Arrays.asList( Sets.newHashSet(newPartitionKeys("11", "00"), newPartitionKeys("11", "10"), newPartitionKeys("1", "1")), Sets.newHashSet(newPartitionKeys("1", "0"), newPartitionKeys("11", "01"), newPartitionKeys("11", "11")) @@ -1129,7 +1133,9 @@ public class PhysicalPlanTest LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); - dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2)); + TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new TestPartitioner<TestGeneratorInputOperator>(); + o1Partitioner.setPartitionCount(2); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, o1Partitioner); dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); OperatorMeta o1Meta = dag.getMeta(o1); @@ -1140,9 +1146,6 @@ public class PhysicalPlanTest dag.addStream("o1.outport1", o1.outport, o2.inport1); - int maxContainers = 10; - dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers); - TestPlanContext ctx = new TestPlanContext(); dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx); @@ -1298,16 +1301,23 @@ public class PhysicalPlanTest Set<PTOperator> expUndeploy = Sets.newHashSet(); Set<PTOperator> expDeploy = Sets.newHashSet(); for (PTOperator o1p : plan.getOperators(o1Meta)) { - expUndeploy.add(o1p); PartitioningTest.PartitionLoadWatch.put(o1p, 1); plan.onStatusUpdate(o1p); } Assert.assertEquals("repartition event", 1, ctx.events.size()); + o1Partitioner.extraPartitions.add(new DefaultPartition<TestGeneratorInputOperator>(o1)); ctx.events.remove(0).run(); - - Assert.assertEquals("M partitions after scale up " + o1Meta, 2, plan.getOperators(o1Meta).size()); - expDeploy.addAll(plan.getOperators(o1Meta)); + o1Partitioner.extraPartitions.clear(); + + List<PTOperator> o1Partitions = plan.getOperators(o1Meta); + List<PTOperator> o2Partitions = plan.getOperators(o2Meta); + Assert.assertEquals("M partitions after scale up " + o1Meta, 2, o1Partitions.size()); + expDeploy.add(o1Partitions.get(1)); // previous partition unchanged + for (PTOperator o1p : o1Partitions) { + Assert.assertEquals("outputs " + o1p, 1, o1p.getOutputs().size()); + Assert.assertEquals("sinks " + o1p, o2Partitions.size(), o1p.getOutputs().get(0).sinks.size()); + } for (PTOperator o2p : plan.getOperators(o2Meta)) { expUndeploy.add(o2p); expDeploy.add(o2p); @@ -2027,9 +2037,16 @@ public class PhysicalPlanTest private class TestPartitioner<T extends Operator> extends StatelessPartitioner<T> { private static final long serialVersionUID = 1L; + final List<Partition<T>> extraPartitions = Lists.newArrayList(); + @Override public Collection<Partition<T>> definePartitions(Collection<Partition<T>> partitions, PartitioningContext context) { + if (!extraPartitions.isEmpty()) { + partitions.addAll(extraPartitions); + return partitions; + } + Collection<Partition<T>> newPartitions = super.definePartitions(partitions, context); if (context.getParallelPartitionCount() > 0 && newPartitions.size() < context.getParallelPartitionCount()) { // parallel partitioned, fill to requested count
