Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 09f716e00 -> 8eb81f7c6


APEX-93 #resolve #comment Fixing dynamic partitioning issue with persist stream
Added flow to redeploy persist operators as well when sink operators are 
dynamically repartitioned
Modified dynamic repartitioning test case to validate that persist operator is 
part of the dependent operators redeployed after partitioning


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/3178f13f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/3178f13f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/3178f13f

Branch: refs/heads/devel-3
Commit: 3178f13f49695aa4f6910006ecd4efbca8dad6a9
Parents: 55a068f
Author: ishark <[email protected]>
Authored: Thu Sep 3 19:02:02 2015 -0700
Committer: ishark <[email protected]>
Committed: Wed Sep 9 16:02:43 2015 -0700

----------------------------------------------------------------------
 .../StreamCodecWrapperForPersistance.java       |  2 +-
 .../stram/plan/physical/PhysicalPlan.java       | 28 +++++++++++++++++++-
 .../stram/plan/StreamPersistanceTests.java      | 13 +++++++++
 3 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3178f13f/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
index 97fd75f..81be56a 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
@@ -52,7 +52,7 @@ public class StreamCodecWrapperForPersistance<T> implements 
StreamCodec<T>, Seri
       Collection<PartitionKeys> partitionKeysList = entry.getValue();
 
       for (PartitionKeys keys : partitionKeysList) {
-        if (keys.partitions.contains(keys.mask & codec.getPartition(o))) {
+        if ( keys.partitions != null && keys.partitions.contains(keys.mask & 
codec.getPartition(o))) {
           // Then at least one of the partitions is getting this event
           // So send the event to persist operator
           return true;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3178f13f/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 2176035..fb429a9 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -420,7 +420,7 @@ public class PhysicalPlan implements Serializable
     Collection<PTOperator> ptOperators = 
getOperators(sinkPortMeta.getOperatorWrapper());
     Collection<PartitionKeys> partitionKeysList = new 
ArrayList<PartitionKeys>();
     for (PTOperator p : ptOperators) {
-      PartitionKeys keys = (PartitionKeys) 
p.getPartitionKeys().get(sinkPortMeta.getPortObject());
+      PartitionKeys keys = p.partitionKeys.get(sinkPortMeta);
       partitionKeysList.add(keys);
     }
 
@@ -1390,9 +1390,35 @@ public class PhysicalPlan implements Serializable
         getDeps(operator, visited);
       }
     }
+    visited.addAll(getDependentPersistOperators(operators));
     return visited;
   }
 
+  private Set<PTOperator> getDependentPersistOperators(Collection<PTOperator> 
operators)
+  {
+    Set<PTOperator> persistOperators = new LinkedHashSet<PTOperator>();
+    if (operators != null) {
+      for (PTOperator operator : operators) {
+        for (PTInput in : operator.inputs) {
+          if (in.logicalStream.getPersistOperator() != null) {
+            for (InputPortMeta inputPort : 
in.logicalStream.getSinksToPersist()) {
+              if 
(inputPort.getOperatorWrapper().equals(operator.operatorMeta)) {
+                // Redeploy the stream wide persist operator only if the 
current sink is being persisted
+                
persistOperators.addAll(getOperators(in.logicalStream.getPersistOperator()));
+                break;
+              }
+            }
+          }
+          for (Entry<InputPortMeta, OperatorMeta> entry : 
in.logicalStream.sinkSpecificPersistOperatorMap.entrySet()) {
+            // Redeploy sink specific persist operators
+            persistOperators.addAll(getOperators(entry.getValue()));
+          }
+        }
+      }
+    }
+    return persistOperators;
+  }
+
   /**
    * Add logical operator to the plan. Assumes that upstream operators have 
been added before.
    * @param om

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3178f13f/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java 
b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index c82f3a9..1cd4311 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -1,12 +1,14 @@
 package com.datatorrent.stram.plan;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -956,12 +958,23 @@ public class StreamPersistanceTests
 
     List<PTOperator> ptos = plan.getOperators(passThruMeta);
 
+    PTOperator persistOperatorContainer = null;
+
     for (PTContainer container : plan.getContainers()) {
       for (PTOperator operator : container.getOperators()) {
         operator.setState(PTOperator.State.ACTIVE);
+        if (operator.getName().equals("persister")) {
+          persistOperatorContainer = operator;
+        }
       }
     }
 
+    // Check that persist operator is part of dependents redeployed
+    Set<PTOperator> operators = plan.getDependents(ptos);
+    logger.debug("Operators to be re-deployed = {}", operators);
+    // Validate that persist operator is part of dependents
+    assertTrue("persist operator should be part of the operators to be 
redeployed", operators.contains(persistOperatorContainer));
+
     LogicalPlan.StreamMeta s1 = (LogicalPlan.StreamMeta) s;
     StreamCodec codec = 
s1.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC);
 

Reply via email to