Where the data comes from isn't important for this discussion. The scenario is join -> topN
With intermediate files it is: ( join -> writer ) - - -> ( reader -> topN ) On Mon, Apr 10, 2017 at 8:26 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote: > In your example join is both consumer and producer, is not it? Where does > it get data from? Join is not an input operator. > > Thank you, > > Vlad > > > On 4/10/17 08:13, Thomas Weise wrote: > >> In this example join/writer produces the data, reader/topN consumes. You >> cannot deallocate producer before all data has been drained. When using >> files, join/writer can be deallocated when all data was flushed to the >> files and allocation of consumer can wait until that occurred, if the >> space >> isn't available to have both of them active at same time. >> >> Overall it seems this is not a matter of activating/deactivating streams >> but operators. >> >> Thomas >> >> >> >> On Mon, Apr 10, 2017 at 8:05 AM, Vlad Rozov <v.ro...@datatorrent.com> >> wrote: >> >> With additional file readers/writers the pipeline of a single stage >>> becomes the 3 operator use case I described. With ability to open/close >>> ports, platform can optimize it by re-allocating resources from readers >>> to >>> writers. >>> >>> Thank you, >>> >>> Vlad >>> >>> >>> On 4/10/17 07:44, Thomas Weise wrote: >>> >>> In streaming there is a stream (surprise), in a space constraint batch >>>> case, we can have additional file writers/readers between the operators. >>>> >>>> Modules can in fact be used to support pipeline reuse, but they must be >>>> added/removed dynamically to support stages with on-demand resource >>>> allocation. >>>> >>>> Thomas >>>> >>>> >>>> On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <v.ro...@datatorrent.com> >>>> wrote: >>>> >>>> Do you suggest that in a streaming use case join operator also pass data >>>> >>>>> to downstream using files or that there are two different join >>>>> operators >>>>> one for streaming and one for batch? If not, it means that the join >>>>> operator needs to emit data to a separate file output operator, so it >>>>> still >>>>> needs to read data from a temporary space before emitting, why not to >>>>> emit >>>>> directly to topN in this case? >>>>> >>>>> Is not pipeline reuse already supported by Apex modules? >>>>> >>>>> Thank you, >>>>> >>>>> Vlad >>>>> >>>>> >>>>> On 4/10/17 06:59, Thomas Weise wrote: >>>>> >>>>> I don't think this fully covers the the scenario of limited resources. >>>>> >>>>>> You >>>>>> describe a case of 3 operators, but when you consider just 2 operators >>>>>> that >>>>>> both have to hold a large data set in memory, then the suggested >>>>>> approach >>>>>> won't work. Let's say the first operator is outer join and the second >>>>>> operator topN. Both are blocking and cannot emit before all input is >>>>>> seen. >>>>>> >>>>>> To deallocate the outer join, all results need to be drained. It's a >>>>>> resource swap and you need a temporary space to hold the data. Also, >>>>>> if >>>>>> the >>>>>> requirement is to be able to recover and retry from results of stage >>>>>> one, >>>>>> then you need a fault tolerant swap space. If the cluster does not >>>>>> have >>>>>> enough memory, then disk is a good option (SLA vs. memory tradeoff). >>>>>> >>>>>> I would also suggest to think beyond the single DAG scenario. Often >>>>>> users >>>>>> need to define pipelines that are composed of multiple smaller flows >>>>>> (which >>>>>> they may also want to reuse in multiple pipelines). APEXCORE-408 gives >>>>>> you >>>>>> an option to compose such flows within a single Apex application, in >>>>>> addition of covering the simplified use case that we discuss there. >>>>>> >>>>>> Thomas >>>>>> >>>>>> >>>>>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <v.ro...@datatorrent.com> >>>>>> wrote: >>>>>> >>>>>> It is exactly the same use case with the exception that it is not >>>>>> >>>>>> necessary to write data to files. Consider 3 operators, an input >>>>>>> operator, >>>>>>> an aggregate operator and an output operator. When the application >>>>>>> starts, >>>>>>> the output port of the aggregate operator should be in the closed >>>>>>> state, >>>>>>> the stream between the second and the third would be inactive and the >>>>>>> output operator does not need to be allocated. After the input >>>>>>> operator >>>>>>> process all data, it can close the output port and the input operator >>>>>>> may >>>>>>> be de-allocated. Once the aggregator receives EOS on it's input port, >>>>>>> it >>>>>>> should open the output port and start writing to it. At this point, >>>>>>> the >>>>>>> output operator needs to be deployed and the stream between the last >>>>>>> two >>>>>>> operators (aggregator and output) becomes active. >>>>>>> >>>>>>> In a real batch use case, it is preferable to have full application >>>>>>> DAG >>>>>>> to >>>>>>> be statically defined and delegate to platform >>>>>>> activation/de-activation >>>>>>> of >>>>>>> stages. It is also preferable not to write intermediate files to >>>>>>> disk/HDFS, >>>>>>> but instead pass data in-memory. >>>>>>> >>>>>>> Thank you, >>>>>>> >>>>>>> Vlad >>>>>>> >>>>>>> >>>>>>> On 4/6/17 09:37, Thomas Weise wrote: >>>>>>> >>>>>>> You would need to provide more specifics of the use case you are >>>>>>> thinking >>>>>>> >>>>>>> to address to make this a meaningful discussion. >>>>>>>> >>>>>>>> An example for APEXCORE-408 (based on real batch use case): I have >>>>>>>> two >>>>>>>> stages, first stage produces a set of files that second stage needs >>>>>>>> as >>>>>>>> input. Stage 1 operators to be released and stage 2 operators >>>>>>>> deployed >>>>>>>> when >>>>>>>> stage 2 starts. These can be independent operators, they don't need >>>>>>>> to >>>>>>>> be >>>>>>>> connected through a stream. >>>>>>>> >>>>>>>> Thomas >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <v.ro...@datatorrent.com >>>>>>>> > >>>>>>>> wrote: >>>>>>>> >>>>>>>> It is not about a use case difference. My proposal and APEXCORE-408 >>>>>>>> >>>>>>>> address the same use case - how to re-allocate resources for batch >>>>>>>> >>>>>>>>> applications or applications where processing happens in stages. >>>>>>>>> The >>>>>>>>> difference between APEXCORE-408 and the proposal is shift in >>>>>>>>> complexity >>>>>>>>> from application logic to the platform. IMO, supporting batch >>>>>>>>> applications >>>>>>>>> using APEXCORE-408 will require more coding on the application >>>>>>>>> side. >>>>>>>>> >>>>>>>>> Thank you, >>>>>>>>> >>>>>>>>> Vlad >>>>>>>>> >>>>>>>>> >>>>>>>>> On 4/5/17 21:57, Thomas Weise wrote: >>>>>>>>> >>>>>>>>> I think this needs more input on a use case level. The ability to >>>>>>>>> >>>>>>>>> dynamically alter the DAG internally will also address the resource >>>>>>>>> >>>>>>>>>> allocation for operators: >>>>>>>>>> >>>>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408 >>>>>>>>>> >>>>>>>>>> It can be used to implement stages of a batch pipeline and is very >>>>>>>>>> flexible >>>>>>>>>> in general. Considering the likely implementation complexity for >>>>>>>>>> the >>>>>>>>>> proposed feature I would like to understand what benefits it >>>>>>>>>> provides >>>>>>>>>> to >>>>>>>>>> the user (use cases that cannot be addressed otherwise)? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Thomas >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov < >>>>>>>>>> v.ro...@datatorrent.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Correct, a statefull downstream operator can only be undeployed >>>>>>>>>> at a >>>>>>>>>> >>>>>>>>>> checkpoint window after it consumes all data emitted by upstream >>>>>>>>>> >>>>>>>>>> operator >>>>>>>>>>> on the closed port. >>>>>>>>>>> >>>>>>>>>>> It will be necessary to distinguish between closed port and >>>>>>>>>>> inactive >>>>>>>>>>> stream. After port is closed, stream may still be active and >>>>>>>>>>> after >>>>>>>>>>> port >>>>>>>>>>> is >>>>>>>>>>> open, stream may still be inactive (not yet ready). >>>>>>>>>>> >>>>>>>>>>> The more contributors participate in the discussion and >>>>>>>>>>> implementation, >>>>>>>>>>> the more solid the feature will be. >>>>>>>>>>> >>>>>>>>>>> Thank you, >>>>>>>>>>> Vlad >>>>>>>>>>> >>>>>>>>>>> Отправлено с iPhone >>>>>>>>>>> >>>>>>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni < >>>>>>>>>>> pra...@datatorrent.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Generally a good idea. Care should be taken around fault >>>>>>>>>>> tolerance >>>>>>>>>>> and >>>>>>>>>>> >>>>>>>>>>> idempotency. Close stream would need to stop accepting new data >>>>>>>>>>> but >>>>>>>>>>> >>>>>>>>>>>> still >>>>>>>>>>>> can't actually close all the streams and un-deploy operators >>>>>>>>>>>> till >>>>>>>>>>>> committed. Idempotency might require the close stream to take >>>>>>>>>>>> effect >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>> end of the window. What would it then mean for re-opening >>>>>>>>>>>> streams >>>>>>>>>>>> >>>>>>>>>>> within >>>>>>>>>>> >>>>>>>>>>>> a >>>>>>>>>>>> >>>>>>>>>>>> window? Also, looks like a larger undertaking, as Ram suggested >>>>>>>>>>>> >>>>>>>>>>> would >>>>>>>>>>> >>>>>>>>>>> be >>>>>>>>>>> >>>>>>>>>>>> good to understand the use cases and I also suggest that >>>>>>>>>>>> multiple >>>>>>>>>>>> folks >>>>>>>>>>>> participate in the implementation effort to ensure that we are >>>>>>>>>>>> able >>>>>>>>>>>> to >>>>>>>>>>>> address all the scenarios and minimize chances of regression in >>>>>>>>>>>> existing >>>>>>>>>>>> behavior. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov < >>>>>>>>>>>> v.ro...@datatorrent.com >>>>>>>>>>>> wrote: >>>>>>>>>>>> All, >>>>>>>>>>>> >>>>>>>>>>>> Currently Apex assumes that an operator can emit on any defined >>>>>>>>>>>> >>>>>>>>>>>> output >>>>>>>>>>>>> port and all streams defined by a DAG are active. I'd like to >>>>>>>>>>>>> propose >>>>>>>>>>>>> an >>>>>>>>>>>>> ability for an operator to open and close output ports. By >>>>>>>>>>>>> default >>>>>>>>>>>>> all >>>>>>>>>>>>> ports defined by an operator will be open. In the case an >>>>>>>>>>>>> operator >>>>>>>>>>>>> for >>>>>>>>>>>>> >>>>>>>>>>>>> any >>>>>>>>>>>>> >>>>>>>>>>>>> reason decides that it will not emit tuples on the output port, >>>>>>>>>>>>> >>>>>>>>>>>> it >>>>>>>>>>>> may >>>>>>>>>>>> >>>>>>>>>>>> close it. This will make the stream inactive and the application >>>>>>>>>>>> >>>>>>>>>>>> master >>>>>>>>>>>>> >>>>>>>>>>>>> may >>>>>>>>>>>>> >>>>>>>>>>>>> undeploy the downstream (for that input stream) operators. If >>>>>>>>>>>>> >>>>>>>>>>>> this >>>>>>>>>>>> leads to >>>>>>>>>>>> containers that don't have any active operators, those >>>>>>>>>>>> containers >>>>>>>>>>>> may >>>>>>>>>>>> be >>>>>>>>>>>> >>>>>>>>>>>> undeployed as well leading to better cluster resource >>>>>>>>>>>> utilization >>>>>>>>>>>> >>>>>>>>>>>> and >>>>>>>>>>>>> better Apex elasticity. Later, the operator may be in a state >>>>>>>>>>>>> where >>>>>>>>>>>>> it >>>>>>>>>>>>> needs to emit tuples on the closed port. In this case, it needs >>>>>>>>>>>>> to >>>>>>>>>>>>> >>>>>>>>>>>>> re-open >>>>>>>>>>>>> >>>>>>>>>>>>> the port and wait till the stream becomes active again before >>>>>>>>>>>>> >>>>>>>>>>>> emitting >>>>>>>>>>>> >>>>>>>>>>>> tuples on that port. Making inactive stream active again, >>>>>>>>>>>> requires >>>>>>>>>>>> >>>>>>>>>>>> the >>>>>>>>>>>>> application master to re-allocate containers and re-deploy the >>>>>>>>>>>>> >>>>>>>>>>>>> downstream >>>>>>>>>>>>> >>>>>>>>>>>>> operators. >>>>>>>>>>>>> >>>>>>>>>>>> It should be also possible for an application designer to mark >>>>>>>>>>>> >>>>>>>>>>>> streams >>>>>>>>>>>>> >>>>>>>>>>>>> as >>>>>>>>>>>>> >>>>>>>>>>>>> inactive when an application starts. This will allow the >>>>>>>>>>>>> >>>>>>>>>>>> application >>>>>>>>>>>> master >>>>>>>>>>>> avoid reserving all containers when the application starts. >>>>>>>>>>>> Later, >>>>>>>>>>>> the >>>>>>>>>>>> port >>>>>>>>>>>> can be open and inactive stream become active. >>>>>>>>>>>> >>>>>>>>>>>> Thank you, >>>>>>>>>>>> >>>>>>>>>>>> Vlad >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >