Hi Vlad, you can avoid writing data files, if intermediate data can fit in memory with APEXCORE-408. The api allows extending the DAG by adding streams and operators to existing DAG. In the example you have provided user could extend a new dag after aggregate operator by adding new output operator and connecting it with aggregate.
- Tushar. On Fri, Apr 7, 2017 at 6:22 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote: > It is exactly the same use case with the exception that it is not > necessary to write data to files. Consider 3 operators, an input operator, > an aggregate operator and an output operator. When the application starts, > the output port of the aggregate operator should be in the closed state, > the stream between the second and the third would be inactive and the > output operator does not need to be allocated. After the input operator > process all data, it can close the output port and the input operator may > be de-allocated. Once the aggregator receives EOS on it's input port, it > should open the output port and start writing to it. At this point, the > output operator needs to be deployed and the stream between the last two > operators (aggregator and output) becomes active. > > In a real batch use case, it is preferable to have full application DAG to > be statically defined and delegate to platform activation/de-activation of > stages. It is also preferable not to write intermediate files to disk/HDFS, > but instead pass data in-memory. > > Thank you, > > Vlad > > > On 4/6/17 09:37, Thomas Weise wrote: > >> You would need to provide more specifics of the use case you are thinking >> to address to make this a meaningful discussion. >> >> An example for APEXCORE-408 (based on real batch use case): I have two >> stages, first stage produces a set of files that second stage needs as >> input. Stage 1 operators to be released and stage 2 operators deployed >> when >> stage 2 starts. These can be independent operators, they don't need to be >> connected through a stream. >> >> Thomas >> >> >> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <v.ro...@datatorrent.com> >> wrote: >> >> It is not about a use case difference. My proposal and APEXCORE-408 >>> address the same use case - how to re-allocate resources for batch >>> applications or applications where processing happens in stages. The >>> difference between APEXCORE-408 and the proposal is shift in complexity >>> from application logic to the platform. IMO, supporting batch >>> applications >>> using APEXCORE-408 will require more coding on the application side. >>> >>> Thank you, >>> >>> Vlad >>> >>> >>> On 4/5/17 21:57, Thomas Weise wrote: >>> >>> I think this needs more input on a use case level. The ability to >>>> dynamically alter the DAG internally will also address the resource >>>> allocation for operators: >>>> >>>> https://issues.apache.org/jira/browse/APEXCORE-408 >>>> >>>> It can be used to implement stages of a batch pipeline and is very >>>> flexible >>>> in general. Considering the likely implementation complexity for the >>>> proposed feature I would like to understand what benefits it provides to >>>> the user (use cases that cannot be addressed otherwise)? >>>> >>>> Thanks, >>>> Thomas >>>> >>>> >>>> >>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <v.ro...@datatorrent.com> >>>> wrote: >>>> >>>> Correct, a statefull downstream operator can only be undeployed at a >>>> >>>>> checkpoint window after it consumes all data emitted by upstream >>>>> operator >>>>> on the closed port. >>>>> >>>>> It will be necessary to distinguish between closed port and inactive >>>>> stream. After port is closed, stream may still be active and after port >>>>> is >>>>> open, stream may still be inactive (not yet ready). >>>>> >>>>> The more contributors participate in the discussion and implementation, >>>>> the more solid the feature will be. >>>>> >>>>> Thank you, >>>>> Vlad >>>>> >>>>> Отправлено с iPhone >>>>> >>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <pra...@datatorrent.com> >>>>> wrote: >>>>> >>>>> Generally a good idea. Care should be taken around fault tolerance and >>>>>> idempotency. Close stream would need to stop accepting new data but >>>>>> still >>>>>> can't actually close all the streams and un-deploy operators till >>>>>> committed. Idempotency might require the close stream to take effect >>>>>> at >>>>>> >>>>>> the >>>>> >>>>> end of the window. What would it then mean for re-opening streams >>>>>> within >>>>>> >>>>>> a >>>>> >>>>> window? Also, looks like a larger undertaking, as Ram suggested would >>>>>> be >>>>>> good to understand the use cases and I also suggest that multiple >>>>>> folks >>>>>> participate in the implementation effort to ensure that we are able to >>>>>> address all the scenarios and minimize chances of regression in >>>>>> existing >>>>>> behavior. >>>>>> >>>>>> Thanks >>>>>> >>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <v.ro...@datatorrent.com> >>>>>> wrote: >>>>>> All, >>>>>> >>>>>>> Currently Apex assumes that an operator can emit on any defined >>>>>>> output >>>>>>> port and all streams defined by a DAG are active. I'd like to propose >>>>>>> an >>>>>>> ability for an operator to open and close output ports. By default >>>>>>> all >>>>>>> ports defined by an operator will be open. In the case an operator >>>>>>> for >>>>>>> >>>>>>> any >>>>>> reason decides that it will not emit tuples on the output port, it may >>>>>> >>>>>>> close it. This will make the stream inactive and the application >>>>>>> master >>>>>>> >>>>>>> may >>>>>> undeploy the downstream (for that input stream) operators. If this >>>>>> leads to >>>>>> containers that don't have any active operators, those containers may >>>>>> be >>>>>> >>>>>>> undeployed as well leading to better cluster resource utilization and >>>>>>> better Apex elasticity. Later, the operator may be in a state where >>>>>>> it >>>>>>> needs to emit tuples on the closed port. In this case, it needs to >>>>>>> >>>>>>> re-open >>>>>> the port and wait till the stream becomes active again before emitting >>>>>> >>>>>>> tuples on that port. Making inactive stream active again, requires >>>>>>> the >>>>>>> application master to re-allocate containers and re-deploy the >>>>>>> >>>>>>> downstream >>>>>> operators. >>>>>> >>>>>>> It should be also possible for an application designer to mark >>>>>>> streams >>>>>>> >>>>>>> as >>>>>> inactive when an application starts. This will allow the application >>>>>> master >>>>>> avoid reserving all containers when the application starts. Later, the >>>>>> port >>>>>> can be open and inactive stream become active. >>>>>> >>>>>>> Thank you, >>>>>>> >>>>>>> Vlad >>>>>>> >>>>>>> >>>>>>> >>>>>>> >