The pipeline depends on the resource availability. It could be: ( source -> join -> writer ) - - -> ( reader -> topN -> sink)
or (source -> join -> topN -> sink) The second case does not allow you do deallocate join (join and topN are active at the same time). On Mon, Apr 10, 2017 at 8:37 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote: > It is important. The generic pipeline proposed is (... -> writer) ---> > (reader -> join -> writer) ---> (reader -> ...), where reader-> aggregator > -> writer becomes a common pattern for a single stage processing. > > Thank you, > > Vlad > > > On 4/10/17 08:31, Thomas Weise wrote: > >> Where the data comes from isn't important for this discussion. The >> scenario >> is join -> topN >> >> With intermediate files it is: ( join -> writer ) - - -> ( reader -> topN >> ) >> >> >> On Mon, Apr 10, 2017 at 8:26 AM, Vlad Rozov <v.ro...@datatorrent.com> >> wrote: >> >> In your example join is both consumer and producer, is not it? Where does >>> it get data from? Join is not an input operator. >>> >>> Thank you, >>> >>> Vlad >>> >>> >>> On 4/10/17 08:13, Thomas Weise wrote: >>> >>> In this example join/writer produces the data, reader/topN consumes. You >>>> cannot deallocate producer before all data has been drained. When using >>>> files, join/writer can be deallocated when all data was flushed to the >>>> files and allocation of consumer can wait until that occurred, if the >>>> space >>>> isn't available to have both of them active at same time. >>>> >>>> Overall it seems this is not a matter of activating/deactivating streams >>>> but operators. >>>> >>>> Thomas >>>> >>>> >>>> >>>> On Mon, Apr 10, 2017 at 8:05 AM, Vlad Rozov <v.ro...@datatorrent.com> >>>> wrote: >>>> >>>> With additional file readers/writers the pipeline of a single stage >>>> >>>>> becomes the 3 operator use case I described. With ability to open/close >>>>> ports, platform can optimize it by re-allocating resources from readers >>>>> to >>>>> writers. >>>>> >>>>> Thank you, >>>>> >>>>> Vlad >>>>> >>>>> >>>>> On 4/10/17 07:44, Thomas Weise wrote: >>>>> >>>>> In streaming there is a stream (surprise), in a space constraint batch >>>>> >>>>>> case, we can have additional file writers/readers between the >>>>>> operators. >>>>>> >>>>>> Modules can in fact be used to support pipeline reuse, but they must >>>>>> be >>>>>> added/removed dynamically to support stages with on-demand resource >>>>>> allocation. >>>>>> >>>>>> Thomas >>>>>> >>>>>> >>>>>> On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <v.ro...@datatorrent.com> >>>>>> wrote: >>>>>> >>>>>> Do you suggest that in a streaming use case join operator also pass >>>>>> data >>>>>> >>>>>> to downstream using files or that there are two different join >>>>>>> operators >>>>>>> one for streaming and one for batch? If not, it means that the join >>>>>>> operator needs to emit data to a separate file output operator, so it >>>>>>> still >>>>>>> needs to read data from a temporary space before emitting, why not to >>>>>>> emit >>>>>>> directly to topN in this case? >>>>>>> >>>>>>> Is not pipeline reuse already supported by Apex modules? >>>>>>> >>>>>>> Thank you, >>>>>>> >>>>>>> Vlad >>>>>>> >>>>>>> >>>>>>> On 4/10/17 06:59, Thomas Weise wrote: >>>>>>> >>>>>>> I don't think this fully covers the the scenario of limited >>>>>>> resources. >>>>>>> >>>>>>> You >>>>>>>> describe a case of 3 operators, but when you consider just 2 >>>>>>>> operators >>>>>>>> that >>>>>>>> both have to hold a large data set in memory, then the suggested >>>>>>>> approach >>>>>>>> won't work. Let's say the first operator is outer join and the >>>>>>>> second >>>>>>>> operator topN. Both are blocking and cannot emit before all input is >>>>>>>> seen. >>>>>>>> >>>>>>>> To deallocate the outer join, all results need to be drained. It's a >>>>>>>> resource swap and you need a temporary space to hold the data. Also, >>>>>>>> if >>>>>>>> the >>>>>>>> requirement is to be able to recover and retry from results of stage >>>>>>>> one, >>>>>>>> then you need a fault tolerant swap space. If the cluster does not >>>>>>>> have >>>>>>>> enough memory, then disk is a good option (SLA vs. memory tradeoff). >>>>>>>> >>>>>>>> I would also suggest to think beyond the single DAG scenario. Often >>>>>>>> users >>>>>>>> need to define pipelines that are composed of multiple smaller flows >>>>>>>> (which >>>>>>>> they may also want to reuse in multiple pipelines). APEXCORE-408 >>>>>>>> gives >>>>>>>> you >>>>>>>> an option to compose such flows within a single Apex application, in >>>>>>>> addition of covering the simplified use case that we discuss there. >>>>>>>> >>>>>>>> Thomas >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <v.ro...@datatorrent.com >>>>>>>> > >>>>>>>> wrote: >>>>>>>> >>>>>>>> It is exactly the same use case with the exception that it is not >>>>>>>> >>>>>>>> necessary to write data to files. Consider 3 operators, an input >>>>>>>> >>>>>>>>> operator, >>>>>>>>> an aggregate operator and an output operator. When the application >>>>>>>>> starts, >>>>>>>>> the output port of the aggregate operator should be in the closed >>>>>>>>> state, >>>>>>>>> the stream between the second and the third would be inactive and >>>>>>>>> the >>>>>>>>> output operator does not need to be allocated. After the input >>>>>>>>> operator >>>>>>>>> process all data, it can close the output port and the input >>>>>>>>> operator >>>>>>>>> may >>>>>>>>> be de-allocated. Once the aggregator receives EOS on it's input >>>>>>>>> port, >>>>>>>>> it >>>>>>>>> should open the output port and start writing to it. At this point, >>>>>>>>> the >>>>>>>>> output operator needs to be deployed and the stream between the >>>>>>>>> last >>>>>>>>> two >>>>>>>>> operators (aggregator and output) becomes active. >>>>>>>>> >>>>>>>>> In a real batch use case, it is preferable to have full application >>>>>>>>> DAG >>>>>>>>> to >>>>>>>>> be statically defined and delegate to platform >>>>>>>>> activation/de-activation >>>>>>>>> of >>>>>>>>> stages. It is also preferable not to write intermediate files to >>>>>>>>> disk/HDFS, >>>>>>>>> but instead pass data in-memory. >>>>>>>>> >>>>>>>>> Thank you, >>>>>>>>> >>>>>>>>> Vlad >>>>>>>>> >>>>>>>>> >>>>>>>>> On 4/6/17 09:37, Thomas Weise wrote: >>>>>>>>> >>>>>>>>> You would need to provide more specifics of the use case you are >>>>>>>>> thinking >>>>>>>>> >>>>>>>>> to address to make this a meaningful discussion. >>>>>>>>> >>>>>>>>>> An example for APEXCORE-408 (based on real batch use case): I have >>>>>>>>>> two >>>>>>>>>> stages, first stage produces a set of files that second stage >>>>>>>>>> needs >>>>>>>>>> as >>>>>>>>>> input. Stage 1 operators to be released and stage 2 operators >>>>>>>>>> deployed >>>>>>>>>> when >>>>>>>>>> stage 2 starts. These can be independent operators, they don't >>>>>>>>>> need >>>>>>>>>> to >>>>>>>>>> be >>>>>>>>>> connected through a stream. >>>>>>>>>> >>>>>>>>>> Thomas >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov < >>>>>>>>>> v.ro...@datatorrent.com >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> It is not about a use case difference. My proposal and >>>>>>>>>> APEXCORE-408 >>>>>>>>>> >>>>>>>>>> address the same use case - how to re-allocate resources for batch >>>>>>>>>> >>>>>>>>>> applications or applications where processing happens in stages. >>>>>>>>>>> The >>>>>>>>>>> difference between APEXCORE-408 and the proposal is shift in >>>>>>>>>>> complexity >>>>>>>>>>> from application logic to the platform. IMO, supporting batch >>>>>>>>>>> applications >>>>>>>>>>> using APEXCORE-408 will require more coding on the application >>>>>>>>>>> side. >>>>>>>>>>> >>>>>>>>>>> Thank you, >>>>>>>>>>> >>>>>>>>>>> Vlad >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 4/5/17 21:57, Thomas Weise wrote: >>>>>>>>>>> >>>>>>>>>>> I think this needs more input on a use case level. The ability to >>>>>>>>>>> >>>>>>>>>>> dynamically alter the DAG internally will also address the >>>>>>>>>>> resource >>>>>>>>>>> >>>>>>>>>>> allocation for operators: >>>>>>>>>>>> >>>>>>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408 >>>>>>>>>>>> >>>>>>>>>>>> It can be used to implement stages of a batch pipeline and is >>>>>>>>>>>> very >>>>>>>>>>>> flexible >>>>>>>>>>>> in general. Considering the likely implementation complexity for >>>>>>>>>>>> the >>>>>>>>>>>> proposed feature I would like to understand what benefits it >>>>>>>>>>>> provides >>>>>>>>>>>> to >>>>>>>>>>>> the user (use cases that cannot be addressed otherwise)? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Thomas >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov < >>>>>>>>>>>> v.ro...@datatorrent.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Correct, a statefull downstream operator can only be undeployed >>>>>>>>>>>> at a >>>>>>>>>>>> >>>>>>>>>>>> checkpoint window after it consumes all data emitted by upstream >>>>>>>>>>>> >>>>>>>>>>>> operator >>>>>>>>>>>> >>>>>>>>>>>>> on the closed port. >>>>>>>>>>>>> >>>>>>>>>>>>> It will be necessary to distinguish between closed port and >>>>>>>>>>>>> inactive >>>>>>>>>>>>> stream. After port is closed, stream may still be active and >>>>>>>>>>>>> after >>>>>>>>>>>>> port >>>>>>>>>>>>> is >>>>>>>>>>>>> open, stream may still be inactive (not yet ready). >>>>>>>>>>>>> >>>>>>>>>>>>> The more contributors participate in the discussion and >>>>>>>>>>>>> implementation, >>>>>>>>>>>>> the more solid the feature will be. >>>>>>>>>>>>> >>>>>>>>>>>>> Thank you, >>>>>>>>>>>>> Vlad >>>>>>>>>>>>> >>>>>>>>>>>>> Отправлено с iPhone >>>>>>>>>>>>> >>>>>>>>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni < >>>>>>>>>>>>> pra...@datatorrent.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Generally a good idea. Care should be taken around fault >>>>>>>>>>>>> tolerance >>>>>>>>>>>>> and >>>>>>>>>>>>> >>>>>>>>>>>>> idempotency. Close stream would need to stop accepting new data >>>>>>>>>>>>> but >>>>>>>>>>>>> >>>>>>>>>>>>> still >>>>>>>>>>>>>> can't actually close all the streams and un-deploy operators >>>>>>>>>>>>>> till >>>>>>>>>>>>>> committed. Idempotency might require the close stream to take >>>>>>>>>>>>>> effect >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>> end of the window. What would it then mean for re-opening >>>>>>>>>>>>>> streams >>>>>>>>>>>>>> >>>>>>>>>>>>>> within >>>>>>>>>>>>> >>>>>>>>>>>>> a >>>>>>>>>>>>>> >>>>>>>>>>>>>> window? Also, looks like a larger undertaking, as Ram >>>>>>>>>>>>>> suggested >>>>>>>>>>>>>> >>>>>>>>>>>>>> would >>>>>>>>>>>>> >>>>>>>>>>>>> be >>>>>>>>>>>>> >>>>>>>>>>>>> good to understand the use cases and I also suggest that >>>>>>>>>>>>>> multiple >>>>>>>>>>>>>> folks >>>>>>>>>>>>>> participate in the implementation effort to ensure that we are >>>>>>>>>>>>>> able >>>>>>>>>>>>>> to >>>>>>>>>>>>>> address all the scenarios and minimize chances of regression >>>>>>>>>>>>>> in >>>>>>>>>>>>>> existing >>>>>>>>>>>>>> behavior. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov < >>>>>>>>>>>>>> v.ro...@datatorrent.com >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> All, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Currently Apex assumes that an operator can emit on any >>>>>>>>>>>>>> defined >>>>>>>>>>>>>> >>>>>>>>>>>>>> output >>>>>>>>>>>>>> >>>>>>>>>>>>>>> port and all streams defined by a DAG are active. I'd like to >>>>>>>>>>>>>>> propose >>>>>>>>>>>>>>> an >>>>>>>>>>>>>>> ability for an operator to open and close output ports. By >>>>>>>>>>>>>>> default >>>>>>>>>>>>>>> all >>>>>>>>>>>>>>> ports defined by an operator will be open. In the case an >>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>> for >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> any >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> reason decides that it will not emit tuples on the output >>>>>>>>>>>>>>> port, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> it >>>>>>>>>>>>>> may >>>>>>>>>>>>>> >>>>>>>>>>>>>> close it. This will make the stream inactive and the >>>>>>>>>>>>>> application >>>>>>>>>>>>>> >>>>>>>>>>>>>> master >>>>>>>>>>>>>> >>>>>>>>>>>>>>> may >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> undeploy the downstream (for that input stream) operators. If >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> this >>>>>>>>>>>>>> leads to >>>>>>>>>>>>>> containers that don't have any active operators, those >>>>>>>>>>>>>> containers >>>>>>>>>>>>>> may >>>>>>>>>>>>>> be >>>>>>>>>>>>>> >>>>>>>>>>>>>> undeployed as well leading to better cluster resource >>>>>>>>>>>>>> utilization >>>>>>>>>>>>>> >>>>>>>>>>>>>> and >>>>>>>>>>>>>> >>>>>>>>>>>>>>> better Apex elasticity. Later, the operator may be in a state >>>>>>>>>>>>>>> where >>>>>>>>>>>>>>> it >>>>>>>>>>>>>>> needs to emit tuples on the closed port. In this case, it >>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> re-open >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> the port and wait till the stream becomes active again before >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> emitting >>>>>>>>>>>>>> >>>>>>>>>>>>>> tuples on that port. Making inactive stream active again, >>>>>>>>>>>>>> requires >>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>>> application master to re-allocate containers and re-deploy >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> downstream >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> operators. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It should be also possible for an application designer to >>>>>>>>>>>>>> mark >>>>>>>>>>>>>> >>>>>>>>>>>>>> streams >>>>>>>>>>>>>> >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> inactive when an application starts. This will allow the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> application >>>>>>>>>>>>>> master >>>>>>>>>>>>>> avoid reserving all containers when the application starts. >>>>>>>>>>>>>> Later, >>>>>>>>>>>>>> the >>>>>>>>>>>>>> port >>>>>>>>>>>>>> can be open and inactive stream become active. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thank you, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Vlad >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >