I think this needs more input on a use case level. The ability to dynamically alter the DAG internally will also address the resource allocation for operators:
https://issues.apache.org/jira/browse/APEXCORE-408 It can be used to implement stages of a batch pipeline and is very flexible in general. Considering the likely implementation complexity for the proposed feature I would like to understand what benefits it provides to the user (use cases that cannot be addressed otherwise)? Thanks, Thomas On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <v.ro...@datatorrent.com> wrote: > Correct, a statefull downstream operator can only be undeployed at a > checkpoint window after it consumes all data emitted by upstream operator > on the closed port. > > It will be necessary to distinguish between closed port and inactive > stream. After port is closed, stream may still be active and after port is > open, stream may still be inactive (not yet ready). > > The more contributors participate in the discussion and implementation, > the more solid the feature will be. > > Thank you, > Vlad > > Отправлено с iPhone > > > On Apr 1, 2017, at 11:03, Pramod Immaneni <pra...@datatorrent.com> > wrote: > > > > Generally a good idea. Care should be taken around fault tolerance and > > idempotency. Close stream would need to stop accepting new data but still > > can't actually close all the streams and un-deploy operators till > > committed. Idempotency might require the close stream to take effect at > the > > end of the window. What would it then mean for re-opening streams within > a > > window? Also, looks like a larger undertaking, as Ram suggested would be > > good to understand the use cases and I also suggest that multiple folks > > participate in the implementation effort to ensure that we are able to > > address all the scenarios and minimize chances of regression in existing > > behavior. > > > > Thanks > > > >> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <v.ro...@datatorrent.com> > wrote: > >> > >> All, > >> > >> Currently Apex assumes that an operator can emit on any defined output > >> port and all streams defined by a DAG are active. I'd like to propose an > >> ability for an operator to open and close output ports. By default all > >> ports defined by an operator will be open. In the case an operator for > any > >> reason decides that it will not emit tuples on the output port, it may > >> close it. This will make the stream inactive and the application master > may > >> undeploy the downstream (for that input stream) operators. If this > leads to > >> containers that don't have any active operators, those containers may be > >> undeployed as well leading to better cluster resource utilization and > >> better Apex elasticity. Later, the operator may be in a state where it > >> needs to emit tuples on the closed port. In this case, it needs to > re-open > >> the port and wait till the stream becomes active again before emitting > >> tuples on that port. Making inactive stream active again, requires the > >> application master to re-allocate containers and re-deploy the > downstream > >> operators. > >> > >> It should be also possible for an application designer to mark streams > as > >> inactive when an application starts. This will allow the application > master > >> avoid reserving all containers when the application starts. Later, the > port > >> can be open and inactive stream become active. > >> > >> Thank you, > >> > >> Vlad > >> > >> >