Github user tushargosavi commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/206#discussion_r50076928
--- Diff:
engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
@@ -2059,4 +2106,64 @@ public int hashCode()
return result;
}
+ /**
+ * This class hold Proxy Ports for a stream, and resolves them later
after modules
+ * are expanded.
+ *
+ * If any source and sink added to a stream is ProxyInputPort or
ProxyOutputPort
+ * we create a StreamLinkInfo with the stream reference and keep
ProxyPorts In this class.
+ * Later we go over each Proxy port and find out the actual port
connected to the ProxyPort
+ * and update StreamMeta.
+ */
+ private static final class StreamLinkInfo
+ {
+ OutputPort<?> source;
+ List<InputPort<?>> sinks = new ArrayList<>();
+ final StreamMeta smeta;
+
+ private StreamLinkInfo(StreamMeta smeta)
+ {
+ this.smeta = smeta;
+ }
+
+ private void setSource(OutputPort<?> source)
+ {
+ if (!(source instanceof ProxyOutputPort)) {
+ throw new IllegalArgumentException("Invalid port type expected
ProxyOutputPort");
--- End diff --
This class maintains ports which are to be resolved later, i.e only proxy
port. This is a safety check for preventing adding normal port to this class.
addStream of StreamMeta will only add port to this class if its a ProxyPort.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---