moved attribute from context to logical plan
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4d5828c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4d5828c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4d5828c6 Branch: refs/heads/devel-3 Commit: 4d5828c6ca48f5d28cd8c77c5706c6f72c7cd1ad Parents: f7e1ccf Author: Gaurav <[email protected]> Authored: Wed Dec 16 06:33:54 2015 -0800 Committer: David Yan <[email protected]> Committed: Fri Jan 22 19:04:27 2016 -0800 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/Context.java | 7 ------- .../main/java/com/datatorrent/stram/engine/GenericNode.java | 3 ++- .../java/com/datatorrent/stram/plan/logical/LogicalPlan.java | 8 +++++++- .../com/datatorrent/stram/plan/physical/PhysicalPlan.java | 4 ++-- .../com/datatorrent/stram/plan/physical/StreamMapping.java | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 58bc552..ceed8a2 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -166,13 +166,6 @@ public interface Context */ Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>()); - /** - * Attribute of input port. - * This is a read-only attribute to query whether the input port is connected to a DelayOperator - * This is for iterative processing. - */ - Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false); - @SuppressWarnings("FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index 4777f93..1ccec31 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -40,6 +40,7 @@ import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.netlet.util.CircularBuffer; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; import com.datatorrent.stram.debug.TappedReservoir; +import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.Operators; import com.datatorrent.stram.tuple.ResetWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -207,7 +208,7 @@ public class GenericNode extends Node<Operator> if (pcPair == null || pcPair.context == null) { return false; } - return pcPair.context.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR); + return pcPair.context.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR); } /** http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 3c26118..883ad71 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -76,6 +76,12 @@ import com.datatorrent.stram.engine.Slider; */ public class LogicalPlan implements Serializable, DAG { + /** + * Attribute of input port. + * This is a read-only attribute to query whether the input port is connected to a DelayOperator + * This is for iterative processing. + */ + public static final Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false); @SuppressWarnings("FieldNameHidesFieldInSuperclass") private static final long serialVersionUID = -2099729915606048704L; private static final Logger LOG = LoggerFactory.getLogger(LogicalPlan.class); @@ -1914,7 +1920,7 @@ public class LogicalPlan implements Serializable, DAG for (InputPortMeta sink: downStream.sinks) { if (om.getOperator() instanceof Operator.DelayOperator) { // this is an iteration loop, do not treat it as downstream when detecting cycles - sink.attributes.put(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR, true); + sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true); continue; } OperatorMeta successor = sink.getOperatorWrapper(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index da96ef3..c696224 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -948,11 +948,11 @@ public class PhysicalPlan implements Serializable PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this, sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount); StreamMapping.addInput(slidingUnifier, sourceOut, null); - input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR)); + input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR)); sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier); } else { - input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR)); + input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR)); } oper.inputs.add(input); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java index 91c6eef..f30ceb6 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java @@ -347,7 +347,7 @@ public class StreamMapping implements java.io.Serializable // link to upstream output(s) for this stream for (PTOutput upstreamOut : sourceOper.outputs) { if (upstreamOut.logicalStream == streamMeta) { - PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR)); + PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR)); oper.inputs.add(input); } }
