Repository: apex-core
Updated Branches:
  refs/heads/master 3119aba89 -> 8ae80fee1


APEXCORE-494 Fix scale up from single partition when partition remains 
unchanged.
close #362


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8ae80fee
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8ae80fee
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8ae80fee

Branch: refs/heads/master
Commit: 8ae80fee1400c37fa3ad0dd891a4ddb24eb74d21
Parents: 3119aba
Author: Thomas Weise <[email protected]>
Authored: Wed Aug 10 22:11:56 2016 -0700
Committer: Thomas Weise <[email protected]>
Committed: Tue Aug 16 16:39:34 2016 -0700

----------------------------------------------------------------------
 .../stram/plan/physical/PhysicalPlan.java       |   7 +-
 .../stram/plan/physical/StreamMapping.java      |  12 ++
 .../stram/plan/physical/PhysicalPlanTest.java   | 121 +++++++++++--------
 3 files changed, 84 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ae80fee/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index a8b2e23..11df7ba 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -435,7 +435,7 @@ public class PhysicalPlan implements Serializable
     updatePartitionsInfoForPersistOperator(dag);
 
     Map<PTOperator, PTContainer> operatorContainerMap = new HashMap<>();
-    
+
     // assign operators to containers
     int groupCount = 0;
     Set<PTOperator> deployOperators = Sets.newHashSet();
@@ -463,7 +463,7 @@ public class PhysicalPlan implements Serializable
       }
     }
 
-    
+
     for (PTContainer container : containers) {
       updateContainerMemoryWithBufferServer(container);
       container.setRequiredVCores(getVCores(container.getOperators()));
@@ -490,7 +490,7 @@ public class PhysicalPlan implements Serializable
         LOG.debug("Container with operators [{}] has anti affinity with [{}]", 
StringUtils.join(containerOperators, ","), StringUtils.join(antiOperators, 
","));
       }
     }
-    
+
     for (Map.Entry<PTOperator, Operator> operEntry : this.newOpers.entrySet()) 
{
       initCheckpoint(operEntry.getKey(), operEntry.getValue(), 
Checkpoint.INITIAL_CHECKPOINT);
     }
@@ -1135,7 +1135,6 @@ public class PhysicalPlan implements Serializable
 
     //make sure all the new operators are included in deploy operator list
     this.deployOpers.addAll(this.newOpers.keySet());
-
     ctx.deploy(releaseContainers, this.undeployOpers, newContainers, 
deployOperators);
     this.newOpers.clear();
     this.deployOpers.clear();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ae80fee/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index e404d5a..b5e357e 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -19,6 +19,7 @@
 package com.datatorrent.stram.plan.physical;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -281,6 +282,17 @@ public class StreamMapping implements java.io.Serializable
             (sourceSingleFinal != null ? sourceSingleFinal.booleanValue() : 
PortContext.UNIFIER_SINGLE_FINAL.defaultValue);
 
         if (upstream.size() > 1) {
+          // detach downstream from upstream operator for the case where no 
unifier existed previously
+          for (PTOutput source : upstream) {
+            Iterator<PTInput> sinks = source.sinks.iterator();
+            while (sinks.hasNext()) {
+              PTInput sink = sinks.next();
+              if (sink.target == doperEntry.first) {
+                doperEntry.first.inputs.remove(sink);
+                sinks.remove();
+              }
+            }
+          }
           if (!separateUnifiers && ((pks == null || pks.mask == 0) || 
lastSingle)) {
             if (finalUnifier == null) {
               finalUnifier = createUnifier(streamMeta, plan);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ae80fee/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index 99dee31..8d19640 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -378,8 +378,6 @@ public class PhysicalPlanTest
     dag.addStream("o1.outport1", o1.outport1, o2.inport1, o2.inport2);
     dag.addStream("mergeStream", o2.outport1, mergeNode.inport1);
 
-    dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
-
     OperatorMeta o2Meta = dag.getMeta(o2);
     o2Meta.getAttributes().put(OperatorContext.STATS_LISTENERS,
         Lists.newArrayList((StatsListener)new PartitionLoadWatch(0, 5)));
@@ -389,12 +387,11 @@ public class PhysicalPlanTest
     dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
     PhysicalPlan plan = new PhysicalPlan(dag, ctx);
 
-    Assert.assertEquals("number of containers", 2, 
plan.getContainers().size());
     Assert.assertEquals("number of operators", 3, 
plan.getAllOperators().size());
     Assert.assertEquals("number of save requests", 3, ctx.backupRequests);
 
     List<PTOperator> o2Partitions = plan.getOperators(o2Meta);
-    Assert.assertEquals("number operators " + o2Meta, 1, o2Partitions.size());
+    Assert.assertEquals("partition count " + o2Meta, 1, o2Partitions.size());
 
     PTOperator o2p1 = o2Partitions.get(0);
     Assert.assertEquals("stats handlers " + o2p1, 1, 
o2p1.statsListeners.size());
@@ -404,38 +401,40 @@ public class PhysicalPlanTest
 
     setThroughput(o2p1, 10);
     plan.onStatusUpdate(o2p1);
-    Assert.assertEquals("load exceeds max", 1, ctx.events.size());
+    Assert.assertEquals("partitioning triggered", 1, ctx.events.size());
     ctx.backupRequests = 0;
     ctx.events.remove(0).run();
 
-    List<PTOperator> n2Instances = plan.getOperators(o2Meta);
-    Assert.assertEquals("partition instances " + n2Instances, 2, 
n2Instances.size());
-    PTOperator po = n2Instances.get(0);
-    PTOperator po2 = n2Instances.get(1);
+    o2Partitions = plan.getOperators(o2Meta);
+    Assert.assertEquals("partition count " + o2Partitions, 2, 
o2Partitions.size());
+    o2p1 = o2Partitions.get(0);
+    Assert.assertEquals("sinks " + o2p1.getOutputs(), 1, 
o2p1.getOutputs().size());
+    PTOperator o2p2 = o2Partitions.get(1);
+    Assert.assertEquals("sinks " + o2p2.getOutputs(), 1, 
o2p2.getOutputs().size());
 
     Set<PTOperator> expUndeploy = 
Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode)));
-    expUndeploy.add(po);
+    expUndeploy.add(o2p1);
     expUndeploy.addAll(plan.getMergeOperators(o2Meta));
 
     // verify load update generates expected events per configuration
 
-    setThroughput(po, 0);
-    plan.onStatusUpdate(po);
+    setThroughput(o2p1, 0);
+    plan.onStatusUpdate(o2p1);
     Assert.assertEquals("load min", 0, ctx.events.size());
 
-    setThroughput(po, 3);
-    plan.onStatusUpdate(po);
+    setThroughput(o2p1, 3);
+    plan.onStatusUpdate(o2p1);
     Assert.assertEquals("load within range", 0, ctx.events.size());
 
-    setThroughput(po, 10);
-    plan.onStatusUpdate(po);
+    setThroughput(o2p1, 10);
+    plan.onStatusUpdate(o2p1);
     Assert.assertEquals("load exceeds max", 1, ctx.events.size());
 
     ctx.backupRequests = 0;
     ctx.events.remove(0).run();
 
     Assert.assertEquals("new partitions", 3, plan.getOperators(o2Meta).size());
-    Assert.assertTrue("", plan.getOperators(o2Meta).contains(po2));
+    Assert.assertTrue("", plan.getOperators(o2Meta).contains(o2p2));
 
     for (PTOperator partition : plan.getOperators(o2Meta)) {
       Assert.assertNotNull("container null " + partition, 
partition.getContainer());
@@ -446,17 +445,17 @@ public class PhysicalPlanTest
 
     Set<PTOperator> expDeploy = 
Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode)));
     expDeploy.addAll(plan.getOperators(o2Meta));
-    expDeploy.remove(po2);
+    expDeploy.remove(o2p2);
     expDeploy.addAll(plan.getMergeOperators(o2Meta));
 
     Assert.assertEquals("" + ctx.deploy, expDeploy, ctx.deploy);
     Assert.assertEquals("Count of storage requests", 2, ctx.backupRequests);
 
     // partitioning skipped on insufficient head room
-    po = plan.getOperators(o2Meta).get(0);
+    o2p1 = plan.getOperators(o2Meta).get(0);
     plan.setAvailableResources(0);
-    setThroughput(po, 10);
-    plan.onStatusUpdate(po);
+    setThroughput(o2p1, 10);
+    plan.onStatusUpdate(o2p1);
     Assert.assertEquals("not repartitioned", 1, ctx.events.size());
     ctx.events.remove(0).run();
     Assert.assertEquals("partition count unchanged", 3, 
plan.getOperators(o2Meta).size());
@@ -466,15 +465,20 @@ public class PhysicalPlanTest
   /**
    * Test partitioning of an input operator (no input port).
    * Cover aspects that are not part of generic operator test.
+   * Test scaling from one to multiple partitions with unifier when one 
partition remains unmodified.
    */
   @Test
   public void testInputOperatorPartitioning()
   {
     LogicalPlan dag = new LogicalPlan();
-    TestInputOperator<Object> o1 = dag.addOperator("o1", new 
TestInputOperator<>());
+    final TestInputOperator<Object> o1 = dag.addOperator("o1", new 
TestInputOperator<>());
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    dag.addStream("o1.outport1", o1.output, o2.inport1);
+
     OperatorMeta o1Meta = dag.getMeta(o1);
     dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
-    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestInputOperator<Object>>(2));
+    TestPartitioner<TestInputOperator<Object>> partitioner = new 
TestPartitioner<TestInputOperator<Object>>();
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, partitioner);
 
     TestPlanContext ctx = new TestPlanContext();
     dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
@@ -482,7 +486,7 @@ public class PhysicalPlanTest
     Assert.assertEquals("number of containers", 2, 
plan.getContainers().size());
 
     List<PTOperator> o1Partitions = plan.getOperators(o1Meta);
-    Assert.assertEquals("partition instances " + o1Partitions, 2, 
o1Partitions.size());
+    Assert.assertEquals("partitions " + o1Partitions, 1, o1Partitions.size());
     PTOperator o1p1 = o1Partitions.get(0);
 
     // verify load update generates expected events per configuration
@@ -493,33 +497,34 @@ public class PhysicalPlanTest
     PartitioningTest.PartitionLoadWatch.put(o1p1, 1);
     plan.onStatusUpdate(o1p1);
     Assert.assertEquals("scale up triggered", 1, ctx.events.size());
-
+    // add another partition, keep existing as is
+    partitioner.extraPartitions.add(new 
DefaultPartition<TestInputOperator<Object>>(o1));
     Runnable r = ctx.events.remove(0);
     r.run();
-    Assert.assertEquals("operators after scale up", 3, 
plan.getOperators(o1Meta).size());
-    for (PTOperator p : plan.getOperators(o1Meta)) {
+    partitioner.extraPartitions.clear();
+
+    o1Partitions = plan.getOperators(o1Meta);
+    Assert.assertEquals("operators after scale up", 2, o1Partitions.size());
+    Assert.assertEquals("first partition unmodified", o1p1, 
o1Partitions.get(0));
+    Assert.assertEquals("single output", 1, o1p1.getOutputs().size());
+    Assert.assertEquals("output to unifier", 1, 
o1p1.getOutputs().get(0).sinks.size());
+
+    Set<PTOperator> expUndeploy = 
Sets.newHashSet(plan.getOperators(dag.getMeta(o2)));
+    Set<PTOperator> expDeploy = Sets.newHashSet(o1Partitions.get(1));
+    expDeploy.addAll(plan.getMergeOperators(dag.getMeta(o1)));
+    expDeploy.addAll(expUndeploy);
+
+    Assert.assertEquals("undeploy", expUndeploy, ctx.undeploy);
+    Assert.assertEquals("deploy", expDeploy, ctx.deploy);
+
+    for (PTOperator p : o1Partitions) {
       Assert.assertEquals("activation window id " + p, 
Checkpoint.INITIAL_CHECKPOINT, p.recoveryCheckpoint);
       Assert.assertEquals("checkpoints " + p + " " + p.checkpoints, 
Lists.newArrayList(), p.checkpoints);
       PartitioningTest.PartitionLoadWatch.put(p, -1);
       plan.onStatusUpdate(p);
     }
     ctx.events.remove(0).run();
-    Assert.assertEquals("operators after scale down", 2, 
plan.getOperators(o1Meta).size());
-/*
-    // ensure scale up maintains min checkpoint
-    long checkpoint=1;
-    for (PTOperator p : plan.getOperators(o1Meta)) {
-      p.checkpoints.add(checkpoint);
-      p.setRecoveryCheckpoint(checkpoint);
-      PartitioningTest.PartitionLoadWatch.loadIndicators.put(p.getId(), 1);
-      plan.onStatusUpdate(p);
-    }
-    ctx.events.remove(0).run();
-    Assert.assertEquals("operators after scale up (2)", 4, 
plan.getOperators(o1Meta).size());
-    for (PTOperator p : plan.getOperators(o1Meta)) {
-      Assert.assertEquals("checkpoints " + p.checkpoints, 
p.checkpoints.size(), 1);
-    }
-*/
+    Assert.assertEquals("operators after scale down", 1, 
plan.getOperators(o1Meta).size());
   }
 
   @Test
@@ -773,7 +778,6 @@ public class PhysicalPlanTest
     Assert.assertTrue("" + expectedPartitionKeys, 
expectedPartitionKeys.isEmpty());
 
     // partition merge
-    @SuppressWarnings("unchecked")
     List<HashSet<PartitionKeys>> expectedKeysSets = Arrays.asList(
         Sets.newHashSet(newPartitionKeys("11", "00"), newPartitionKeys("11", 
"10"), newPartitionKeys("1", "1")),
         Sets.newHashSet(newPartitionKeys("1", "0"), newPartitionKeys("11", 
"01"), newPartitionKeys("11", "11"))
@@ -1129,7 +1133,9 @@ public class PhysicalPlanTest
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator o1 = dag.addOperator("o1", 
TestGeneratorInputOperator.class);
-    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(2));
+    TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new 
TestPartitioner<TestGeneratorInputOperator>();
+    o1Partitioner.setPartitionCount(2);
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, o1Partitioner);
     dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
     OperatorMeta o1Meta = dag.getMeta(o1);
 
@@ -1140,9 +1146,6 @@ public class PhysicalPlanTest
 
     dag.addStream("o1.outport1", o1.outport, o2.inport1);
 
-    int maxContainers = 10;
-    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
-
     TestPlanContext ctx = new TestPlanContext();
     dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
 
@@ -1298,16 +1301,23 @@ public class PhysicalPlanTest
       Set<PTOperator> expUndeploy = Sets.newHashSet();
       Set<PTOperator> expDeploy = Sets.newHashSet();
       for (PTOperator o1p : plan.getOperators(o1Meta)) {
-        expUndeploy.add(o1p);
         PartitioningTest.PartitionLoadWatch.put(o1p, 1);
         plan.onStatusUpdate(o1p);
       }
 
       Assert.assertEquals("repartition event", 1, ctx.events.size());
+      o1Partitioner.extraPartitions.add(new 
DefaultPartition<TestGeneratorInputOperator>(o1));
       ctx.events.remove(0).run();
-
-      Assert.assertEquals("M partitions after scale up " + o1Meta, 2, 
plan.getOperators(o1Meta).size());
-      expDeploy.addAll(plan.getOperators(o1Meta));
+      o1Partitioner.extraPartitions.clear();
+
+      List<PTOperator> o1Partitions = plan.getOperators(o1Meta);
+      List<PTOperator> o2Partitions = plan.getOperators(o2Meta);
+      Assert.assertEquals("M partitions after scale up " + o1Meta, 2, 
o1Partitions.size());
+      expDeploy.add(o1Partitions.get(1)); // previous partition unchanged
+      for (PTOperator o1p : o1Partitions) {
+        Assert.assertEquals("outputs " + o1p, 1, o1p.getOutputs().size());
+        Assert.assertEquals("sinks " + o1p, o2Partitions.size(), 
o1p.getOutputs().get(0).sinks.size());
+      }
       for (PTOperator o2p : plan.getOperators(o2Meta)) {
         expUndeploy.add(o2p);
         expDeploy.add(o2p);
@@ -2027,9 +2037,16 @@ public class PhysicalPlanTest
   private class TestPartitioner<T extends Operator> extends 
StatelessPartitioner<T>
   {
     private static final long serialVersionUID = 1L;
+    final List<Partition<T>> extraPartitions = Lists.newArrayList();
+
     @Override
     public Collection<Partition<T>> definePartitions(Collection<Partition<T>> 
partitions, PartitioningContext context)
     {
+      if (!extraPartitions.isEmpty()) {
+        partitions.addAll(extraPartitions);
+        return partitions;
+      }
+
       Collection<Partition<T>> newPartitions = 
super.definePartitions(partitions, context);
       if (context.getParallelPartitionCount() > 0 && newPartitions.size() < 
context.getParallelPartitionCount()) {
         // parallel partitioned, fill to requested count

Reply via email to