In streaming there is a stream (surprise), in a space constraint batch case, we can have additional file writers/readers between the operators.
Modules can in fact be used to support pipeline reuse, but they must be added/removed dynamically to support stages with on-demand resource allocation. Thomas On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <[email protected]> wrote: > Do you suggest that in a streaming use case join operator also pass data > to downstream using files or that there are two different join operators > one for streaming and one for batch? If not, it means that the join > operator needs to emit data to a separate file output operator, so it still > needs to read data from a temporary space before emitting, why not to emit > directly to topN in this case? > > Is not pipeline reuse already supported by Apex modules? > > Thank you, > > Vlad > > > On 4/10/17 06:59, Thomas Weise wrote: > >> I don't think this fully covers the the scenario of limited resources. You >> describe a case of 3 operators, but when you consider just 2 operators >> that >> both have to hold a large data set in memory, then the suggested approach >> won't work. Let's say the first operator is outer join and the second >> operator topN. Both are blocking and cannot emit before all input is seen. >> >> To deallocate the outer join, all results need to be drained. It's a >> resource swap and you need a temporary space to hold the data. Also, if >> the >> requirement is to be able to recover and retry from results of stage one, >> then you need a fault tolerant swap space. If the cluster does not have >> enough memory, then disk is a good option (SLA vs. memory tradeoff). >> >> I would also suggest to think beyond the single DAG scenario. Often users >> need to define pipelines that are composed of multiple smaller flows >> (which >> they may also want to reuse in multiple pipelines). APEXCORE-408 gives you >> an option to compose such flows within a single Apex application, in >> addition of covering the simplified use case that we discuss there. >> >> Thomas >> >> >> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <[email protected]> >> wrote: >> >> It is exactly the same use case with the exception that it is not >>> necessary to write data to files. Consider 3 operators, an input >>> operator, >>> an aggregate operator and an output operator. When the application >>> starts, >>> the output port of the aggregate operator should be in the closed state, >>> the stream between the second and the third would be inactive and the >>> output operator does not need to be allocated. After the input operator >>> process all data, it can close the output port and the input operator may >>> be de-allocated. Once the aggregator receives EOS on it's input port, it >>> should open the output port and start writing to it. At this point, the >>> output operator needs to be deployed and the stream between the last two >>> operators (aggregator and output) becomes active. >>> >>> In a real batch use case, it is preferable to have full application DAG >>> to >>> be statically defined and delegate to platform activation/de-activation >>> of >>> stages. It is also preferable not to write intermediate files to >>> disk/HDFS, >>> but instead pass data in-memory. >>> >>> Thank you, >>> >>> Vlad >>> >>> >>> On 4/6/17 09:37, Thomas Weise wrote: >>> >>> You would need to provide more specifics of the use case you are thinking >>>> to address to make this a meaningful discussion. >>>> >>>> An example for APEXCORE-408 (based on real batch use case): I have two >>>> stages, first stage produces a set of files that second stage needs as >>>> input. Stage 1 operators to be released and stage 2 operators deployed >>>> when >>>> stage 2 starts. These can be independent operators, they don't need to >>>> be >>>> connected through a stream. >>>> >>>> Thomas >>>> >>>> >>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <[email protected]> >>>> wrote: >>>> >>>> It is not about a use case difference. My proposal and APEXCORE-408 >>>> >>>>> address the same use case - how to re-allocate resources for batch >>>>> applications or applications where processing happens in stages. The >>>>> difference between APEXCORE-408 and the proposal is shift in complexity >>>>> from application logic to the platform. IMO, supporting batch >>>>> applications >>>>> using APEXCORE-408 will require more coding on the application side. >>>>> >>>>> Thank you, >>>>> >>>>> Vlad >>>>> >>>>> >>>>> On 4/5/17 21:57, Thomas Weise wrote: >>>>> >>>>> I think this needs more input on a use case level. The ability to >>>>> >>>>>> dynamically alter the DAG internally will also address the resource >>>>>> allocation for operators: >>>>>> >>>>>> https://issues.apache.org/jira/browse/APEXCORE-408 >>>>>> >>>>>> It can be used to implement stages of a batch pipeline and is very >>>>>> flexible >>>>>> in general. Considering the likely implementation complexity for the >>>>>> proposed feature I would like to understand what benefits it provides >>>>>> to >>>>>> the user (use cases that cannot be addressed otherwise)? >>>>>> >>>>>> Thanks, >>>>>> Thomas >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <[email protected]> >>>>>> wrote: >>>>>> >>>>>> Correct, a statefull downstream operator can only be undeployed at a >>>>>> >>>>>> checkpoint window after it consumes all data emitted by upstream >>>>>>> operator >>>>>>> on the closed port. >>>>>>> >>>>>>> It will be necessary to distinguish between closed port and inactive >>>>>>> stream. After port is closed, stream may still be active and after >>>>>>> port >>>>>>> is >>>>>>> open, stream may still be inactive (not yet ready). >>>>>>> >>>>>>> The more contributors participate in the discussion and >>>>>>> implementation, >>>>>>> the more solid the feature will be. >>>>>>> >>>>>>> Thank you, >>>>>>> Vlad >>>>>>> >>>>>>> Отправлено с iPhone >>>>>>> >>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>> Generally a good idea. Care should be taken around fault tolerance >>>>>>> and >>>>>>> >>>>>>>> idempotency. Close stream would need to stop accepting new data but >>>>>>>> still >>>>>>>> can't actually close all the streams and un-deploy operators till >>>>>>>> committed. Idempotency might require the close stream to take effect >>>>>>>> at >>>>>>>> >>>>>>>> the >>>>>>>> >>>>>>> end of the window. What would it then mean for re-opening streams >>>>>>> >>>>>>>> within >>>>>>>> >>>>>>>> a >>>>>>>> >>>>>>> window? Also, looks like a larger undertaking, as Ram suggested would >>>>>>> >>>>>>>> be >>>>>>>> good to understand the use cases and I also suggest that multiple >>>>>>>> folks >>>>>>>> participate in the implementation effort to ensure that we are able >>>>>>>> to >>>>>>>> address all the scenarios and minimize chances of regression in >>>>>>>> existing >>>>>>>> behavior. >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <[email protected] >>>>>>>> > >>>>>>>> wrote: >>>>>>>> All, >>>>>>>> >>>>>>>> Currently Apex assumes that an operator can emit on any defined >>>>>>>>> output >>>>>>>>> port and all streams defined by a DAG are active. I'd like to >>>>>>>>> propose >>>>>>>>> an >>>>>>>>> ability for an operator to open and close output ports. By default >>>>>>>>> all >>>>>>>>> ports defined by an operator will be open. In the case an operator >>>>>>>>> for >>>>>>>>> >>>>>>>>> any >>>>>>>>> >>>>>>>> reason decides that it will not emit tuples on the output port, it >>>>>>>> may >>>>>>>> >>>>>>>> close it. This will make the stream inactive and the application >>>>>>>>> master >>>>>>>>> >>>>>>>>> may >>>>>>>>> >>>>>>>> undeploy the downstream (for that input stream) operators. If this >>>>>>>> leads to >>>>>>>> containers that don't have any active operators, those containers >>>>>>>> may >>>>>>>> be >>>>>>>> >>>>>>>> undeployed as well leading to better cluster resource utilization >>>>>>>>> and >>>>>>>>> better Apex elasticity. Later, the operator may be in a state where >>>>>>>>> it >>>>>>>>> needs to emit tuples on the closed port. In this case, it needs to >>>>>>>>> >>>>>>>>> re-open >>>>>>>>> >>>>>>>> the port and wait till the stream becomes active again before >>>>>>>> emitting >>>>>>>> >>>>>>>> tuples on that port. Making inactive stream active again, requires >>>>>>>>> the >>>>>>>>> application master to re-allocate containers and re-deploy the >>>>>>>>> >>>>>>>>> downstream >>>>>>>>> >>>>>>>> operators. >>>>>>>> >>>>>>>> It should be also possible for an application designer to mark >>>>>>>>> streams >>>>>>>>> >>>>>>>>> as >>>>>>>>> >>>>>>>> inactive when an application starts. This will allow the application >>>>>>>> master >>>>>>>> avoid reserving all containers when the application starts. Later, >>>>>>>> the >>>>>>>> port >>>>>>>> can be open and inactive stream become active. >>>>>>>> >>>>>>>> Thank you, >>>>>>>>> >>>>>>>>> Vlad >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >
