Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 09f716e00 -> 8eb81f7c6
APEX-93 #resolve #comment Fixing dynamic partitioning issue with persist stream Added flow to redeploy persist operators as well when sink operators are dynamically repartitioned Modified dynamic repartitioning test case to validate that persist operator is part of the dependent operators redeployed after partitioning Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/3178f13f Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/3178f13f Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/3178f13f Branch: refs/heads/devel-3 Commit: 3178f13f49695aa4f6910006ecd4efbca8dad6a9 Parents: 55a068f Author: ishark <[email protected]> Authored: Thu Sep 3 19:02:02 2015 -0700 Committer: ishark <[email protected]> Committed: Wed Sep 9 16:02:43 2015 -0700 ---------------------------------------------------------------------- .../StreamCodecWrapperForPersistance.java | 2 +- .../stram/plan/physical/PhysicalPlan.java | 28 +++++++++++++++++++- .../stram/plan/StreamPersistanceTests.java | 13 +++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3178f13f/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java index 97fd75f..81be56a 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java @@ -52,7 +52,7 @@ public class StreamCodecWrapperForPersistance<T> implements StreamCodec<T>, Seri Collection<PartitionKeys> partitionKeysList = entry.getValue(); for (PartitionKeys keys : partitionKeysList) { - if (keys.partitions.contains(keys.mask & codec.getPartition(o))) { + if ( keys.partitions != null && keys.partitions.contains(keys.mask & codec.getPartition(o))) { // Then at least one of the partitions is getting this event // So send the event to persist operator return true; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3178f13f/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index 2176035..fb429a9 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -420,7 +420,7 @@ public class PhysicalPlan implements Serializable Collection<PTOperator> ptOperators = getOperators(sinkPortMeta.getOperatorWrapper()); Collection<PartitionKeys> partitionKeysList = new ArrayList<PartitionKeys>(); for (PTOperator p : ptOperators) { - PartitionKeys keys = (PartitionKeys) p.getPartitionKeys().get(sinkPortMeta.getPortObject()); + PartitionKeys keys = p.partitionKeys.get(sinkPortMeta); partitionKeysList.add(keys); } @@ -1390,9 +1390,35 @@ public class PhysicalPlan implements Serializable getDeps(operator, visited); } } + visited.addAll(getDependentPersistOperators(operators)); return visited; } + private Set<PTOperator> getDependentPersistOperators(Collection<PTOperator> operators) + { + Set<PTOperator> persistOperators = new LinkedHashSet<PTOperator>(); + if (operators != null) { + for (PTOperator operator : operators) { + for (PTInput in : operator.inputs) { + if (in.logicalStream.getPersistOperator() != null) { + for (InputPortMeta inputPort : in.logicalStream.getSinksToPersist()) { + if (inputPort.getOperatorWrapper().equals(operator.operatorMeta)) { + // Redeploy the stream wide persist operator only if the current sink is being persisted + persistOperators.addAll(getOperators(in.logicalStream.getPersistOperator())); + break; + } + } + } + for (Entry<InputPortMeta, OperatorMeta> entry : in.logicalStream.sinkSpecificPersistOperatorMap.entrySet()) { + // Redeploy sink specific persist operators + persistOperators.addAll(getOperators(entry.getValue())); + } + } + } + } + return persistOperators; + } + /** * Add logical operator to the plan. Assumes that upstream operators have been added before. * @param om http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3178f13f/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java index c82f3a9..1cd4311 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java @@ -1,12 +1,14 @@ package com.datatorrent.stram.plan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; @@ -956,12 +958,23 @@ public class StreamPersistanceTests List<PTOperator> ptos = plan.getOperators(passThruMeta); + PTOperator persistOperatorContainer = null; + for (PTContainer container : plan.getContainers()) { for (PTOperator operator : container.getOperators()) { operator.setState(PTOperator.State.ACTIVE); + if (operator.getName().equals("persister")) { + persistOperatorContainer = operator; + } } } + // Check that persist operator is part of dependents redeployed + Set<PTOperator> operators = plan.getDependents(ptos); + logger.debug("Operators to be re-deployed = {}", operators); + // Validate that persist operator is part of dependents + assertTrue("persist operator should be part of the operators to be redeployed", operators.contains(persistOperatorContainer)); + LogicalPlan.StreamMeta s1 = (LogicalPlan.StreamMeta) s; StreamCodec codec = s1.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC);
