I think this needs more input on a use case level. The ability to
dynamically alter the DAG internally will also address the resource
allocation for operators:

https://issues.apache.org/jira/browse/APEXCORE-408

It can be used to implement stages of a batch pipeline and is very flexible
in general. Considering the likely implementation complexity for the
proposed feature I would like to understand what benefits it provides to
the user (use cases that cannot be addressed otherwise)?

Thanks,
Thomas



On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <v.ro...@datatorrent.com> wrote:

> Correct, a statefull downstream operator can only be undeployed at a
> checkpoint window after it consumes all data emitted by upstream operator
> on the closed port.
>
> It will be necessary to distinguish between closed port and inactive
> stream. After port is closed, stream may still be active and after port is
> open, stream may still be inactive (not yet ready).
>
> The more contributors participate in the discussion and implementation,
> the more solid the feature will be.
>
> Thank you,
> Vlad
>
> Отправлено с iPhone
>
> > On Apr 1, 2017, at 11:03, Pramod Immaneni <pra...@datatorrent.com>
> wrote:
> >
> > Generally a good idea. Care should be taken around fault tolerance and
> > idempotency. Close stream would need to stop accepting new data but still
> > can't actually close all the streams and un-deploy operators till
> > committed. Idempotency might require the close stream to take effect at
> the
> > end of the window. What would it then mean for re-opening streams within
> a
> > window? Also, looks like a larger undertaking, as Ram suggested would be
> > good to understand the use cases and I also suggest that multiple folks
> > participate in the implementation effort to ensure that we are able to
> > address all the scenarios and minimize chances of regression in existing
> > behavior.
> >
> > Thanks
> >
> >> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <v.ro...@datatorrent.com>
> wrote:
> >>
> >> All,
> >>
> >> Currently Apex assumes that an operator can emit on any defined output
> >> port and all streams defined by a DAG are active. I'd like to propose an
> >> ability for an operator to open and close output ports. By default all
> >> ports defined by an operator will be open. In the case an operator for
> any
> >> reason decides that it will not emit tuples on the output port, it may
> >> close it. This will make the stream inactive and the application master
> may
> >> undeploy the downstream (for that input stream) operators. If this
> leads to
> >> containers that don't have any active operators, those containers may be
> >> undeployed as well leading to better cluster resource utilization and
> >> better Apex elasticity. Later, the operator may be in a state where it
> >> needs to emit tuples on the closed port. In this case, it needs to
> re-open
> >> the port and wait till the stream becomes active again before emitting
> >> tuples on that port. Making inactive stream active again, requires the
> >> application master to re-allocate containers and re-deploy the
> downstream
> >> operators.
> >>
> >> It should be also possible for an application designer to mark streams
> as
> >> inactive when an application starts. This will allow the application
> master
> >> avoid reserving all containers when the application starts. Later, the
> port
> >> can be open and inactive stream become active.
> >>
> >> Thank you,
> >>
> >> Vlad
> >>
> >>
>

Reply via email to