Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/206#discussion_r50075888
--- Diff:
engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
@@ -2059,4 +2106,64 @@ public int hashCode()
return result;
}
+ /**
+ * This class hold Proxy Ports for a stream, and resolves them later
after modules
+ * are expanded.
+ *
+ * If any source and sink added to a stream is ProxyInputPort or
ProxyOutputPort
+ * we create a StreamLinkInfo with the stream reference and keep
ProxyPorts In this class.
+ * Later we go over each Proxy port and find out the actual port
connected to the ProxyPort
+ * and update StreamMeta.
+ */
+ private static final class StreamLinkInfo
+ {
+ OutputPort<?> source;
+ List<InputPort<?>> sinks = new ArrayList<>();
+ final StreamMeta smeta;
+
+ private StreamLinkInfo(StreamMeta smeta)
+ {
+ this.smeta = smeta;
+ }
+
+ private void setSource(OutputPort<?> source)
+ {
+ if (!(source instanceof ProxyOutputPort)) {
+ throw new IllegalArgumentException("Invalid port type expected
ProxyOutputPort");
--- End diff --
We are making proxy ports compulsory here. Is it not possible for the port
to be a normal Default output port?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---