The pipeline depends on the resource availability. It could be:

( source -> join -> writer ) - - -> ( reader -> topN -> sink)

or

(source -> join -> topN -> sink)

The second case does not allow you do deallocate join (join and topN are
active at the same time).


On Mon, Apr 10, 2017 at 8:37 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote:

> It is important. The generic pipeline proposed is (... -> writer) --->
> (reader -> join -> writer) ---> (reader -> ...), where reader-> aggregator
> -> writer becomes a common pattern for a single stage processing.
>
> Thank you,
>
> Vlad
>
>
> On 4/10/17 08:31, Thomas Weise wrote:
>
>> Where the data comes from isn't important for this discussion. The
>> scenario
>> is join -> topN
>>
>> With intermediate files it is: ( join -> writer ) - - -> ( reader -> topN
>> )
>>
>>
>> On Mon, Apr 10, 2017 at 8:26 AM, Vlad Rozov <v.ro...@datatorrent.com>
>> wrote:
>>
>> In your example join is both consumer and producer, is not it? Where does
>>> it get data from? Join is not an input operator.
>>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>>
>>> On 4/10/17 08:13, Thomas Weise wrote:
>>>
>>> In this example join/writer produces the data, reader/topN consumes. You
>>>> cannot deallocate producer before all data has been drained. When using
>>>> files, join/writer can be deallocated when all data was flushed to the
>>>> files and allocation of consumer can wait until that occurred, if the
>>>> space
>>>> isn't available to have both of them active at same time.
>>>>
>>>> Overall it seems this is not a matter of activating/deactivating streams
>>>> but operators.
>>>>
>>>> Thomas
>>>>
>>>>
>>>>
>>>> On Mon, Apr 10, 2017 at 8:05 AM, Vlad Rozov <v.ro...@datatorrent.com>
>>>> wrote:
>>>>
>>>> With additional file readers/writers the pipeline of a single stage
>>>>
>>>>> becomes the 3 operator use case I described. With ability to open/close
>>>>> ports, platform can optimize it by re-allocating resources from readers
>>>>> to
>>>>> writers.
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Vlad
>>>>>
>>>>>
>>>>> On 4/10/17 07:44, Thomas Weise wrote:
>>>>>
>>>>> In streaming there is a stream (surprise), in a space constraint batch
>>>>>
>>>>>> case, we can have additional file writers/readers between the
>>>>>> operators.
>>>>>>
>>>>>> Modules can in fact be used to support pipeline reuse, but they must
>>>>>> be
>>>>>> added/removed dynamically to support stages with on-demand resource
>>>>>> allocation.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <v.ro...@datatorrent.com>
>>>>>> wrote:
>>>>>>
>>>>>> Do you suggest that in a streaming use case join operator also pass
>>>>>> data
>>>>>>
>>>>>> to downstream using files or that there are two different join
>>>>>>> operators
>>>>>>> one for streaming and one for batch? If not, it means that the join
>>>>>>> operator needs to emit data to a separate file output operator, so it
>>>>>>> still
>>>>>>> needs to read data from a temporary space before emitting, why not to
>>>>>>> emit
>>>>>>> directly to topN in this case?
>>>>>>>
>>>>>>> Is not pipeline reuse already supported by Apex modules?
>>>>>>>
>>>>>>> Thank you,
>>>>>>>
>>>>>>> Vlad
>>>>>>>
>>>>>>>
>>>>>>> On 4/10/17 06:59, Thomas Weise wrote:
>>>>>>>
>>>>>>> I don't think this fully covers the the scenario of limited
>>>>>>> resources.
>>>>>>>
>>>>>>> You
>>>>>>>> describe a case of 3 operators, but when you consider just 2
>>>>>>>> operators
>>>>>>>> that
>>>>>>>> both have to hold a large data set in memory, then the suggested
>>>>>>>> approach
>>>>>>>> won't work. Let's say the first operator is outer join and the
>>>>>>>> second
>>>>>>>> operator topN. Both are blocking and cannot emit before all input is
>>>>>>>> seen.
>>>>>>>>
>>>>>>>> To deallocate the outer join, all results need to be drained. It's a
>>>>>>>> resource swap and you need a temporary space to hold the data. Also,
>>>>>>>> if
>>>>>>>> the
>>>>>>>> requirement is to be able to recover and retry from results of stage
>>>>>>>> one,
>>>>>>>> then you need a fault tolerant swap space. If the cluster does not
>>>>>>>> have
>>>>>>>> enough memory, then disk is a good option (SLA vs. memory tradeoff).
>>>>>>>>
>>>>>>>> I would also suggest to think beyond the single DAG scenario. Often
>>>>>>>> users
>>>>>>>> need to define pipelines that are composed of multiple smaller flows
>>>>>>>> (which
>>>>>>>> they may also want to reuse in multiple pipelines). APEXCORE-408
>>>>>>>> gives
>>>>>>>> you
>>>>>>>> an option to compose such flows within a single Apex application, in
>>>>>>>> addition of covering the simplified use case that we discuss there.
>>>>>>>>
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <v.ro...@datatorrent.com
>>>>>>>> >
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> It is exactly the same use case with the exception that it is not
>>>>>>>>
>>>>>>>> necessary to write data to files. Consider 3 operators, an input
>>>>>>>>
>>>>>>>>> operator,
>>>>>>>>> an aggregate operator and an output operator. When the application
>>>>>>>>> starts,
>>>>>>>>> the output port of the aggregate operator should be in the closed
>>>>>>>>> state,
>>>>>>>>> the stream between the second and the third would be inactive and
>>>>>>>>> the
>>>>>>>>> output operator does not need to be allocated. After the input
>>>>>>>>> operator
>>>>>>>>> process all data, it can close the output port and the input
>>>>>>>>> operator
>>>>>>>>> may
>>>>>>>>> be de-allocated. Once the aggregator receives EOS on it's input
>>>>>>>>> port,
>>>>>>>>> it
>>>>>>>>> should open the output port and start writing to it. At this point,
>>>>>>>>> the
>>>>>>>>> output operator needs to be deployed and the stream between the
>>>>>>>>> last
>>>>>>>>> two
>>>>>>>>> operators (aggregator and output) becomes active.
>>>>>>>>>
>>>>>>>>> In a real batch use case, it is preferable to have full application
>>>>>>>>> DAG
>>>>>>>>> to
>>>>>>>>> be statically defined and delegate to platform
>>>>>>>>> activation/de-activation
>>>>>>>>> of
>>>>>>>>> stages. It is also preferable not to write intermediate files to
>>>>>>>>> disk/HDFS,
>>>>>>>>> but instead pass data in-memory.
>>>>>>>>>
>>>>>>>>> Thank you,
>>>>>>>>>
>>>>>>>>> Vlad
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 4/6/17 09:37, Thomas Weise wrote:
>>>>>>>>>
>>>>>>>>> You would need to provide more specifics of the use case you are
>>>>>>>>> thinking
>>>>>>>>>
>>>>>>>>> to address to make this a meaningful discussion.
>>>>>>>>>
>>>>>>>>>> An example for APEXCORE-408 (based on real batch use case): I have
>>>>>>>>>> two
>>>>>>>>>> stages, first stage produces a set of files that second stage
>>>>>>>>>> needs
>>>>>>>>>> as
>>>>>>>>>> input. Stage 1 operators to be released and stage 2 operators
>>>>>>>>>> deployed
>>>>>>>>>> when
>>>>>>>>>> stage 2 starts. These can be independent operators, they don't
>>>>>>>>>> need
>>>>>>>>>> to
>>>>>>>>>> be
>>>>>>>>>> connected through a stream.
>>>>>>>>>>
>>>>>>>>>> Thomas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <
>>>>>>>>>> v.ro...@datatorrent.com
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> It is not about a use case difference. My proposal and
>>>>>>>>>> APEXCORE-408
>>>>>>>>>>
>>>>>>>>>> address the same use case - how to re-allocate resources for batch
>>>>>>>>>>
>>>>>>>>>> applications or applications where processing happens in stages.
>>>>>>>>>>> The
>>>>>>>>>>> difference between APEXCORE-408 and the proposal is shift in
>>>>>>>>>>> complexity
>>>>>>>>>>> from application logic to the platform. IMO, supporting batch
>>>>>>>>>>> applications
>>>>>>>>>>> using APEXCORE-408 will require more coding on the application
>>>>>>>>>>> side.
>>>>>>>>>>>
>>>>>>>>>>> Thank you,
>>>>>>>>>>>
>>>>>>>>>>> Vlad
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>>>>>>>>>
>>>>>>>>>>> I think this needs more input on a use case level. The ability to
>>>>>>>>>>>
>>>>>>>>>>> dynamically alter the DAG internally will also address the
>>>>>>>>>>> resource
>>>>>>>>>>>
>>>>>>>>>>> allocation for operators:
>>>>>>>>>>>>
>>>>>>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>>>>>>>>>
>>>>>>>>>>>> It can be used to implement stages of a batch pipeline and is
>>>>>>>>>>>> very
>>>>>>>>>>>> flexible
>>>>>>>>>>>> in general. Considering the likely implementation complexity for
>>>>>>>>>>>> the
>>>>>>>>>>>> proposed feature I would like to understand what benefits it
>>>>>>>>>>>> provides
>>>>>>>>>>>> to
>>>>>>>>>>>> the user (use cases that cannot be addressed otherwise)?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <
>>>>>>>>>>>> v.ro...@datatorrent.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Correct, a statefull downstream operator can only be undeployed
>>>>>>>>>>>> at a
>>>>>>>>>>>>
>>>>>>>>>>>> checkpoint window after it consumes all data emitted by upstream
>>>>>>>>>>>>
>>>>>>>>>>>> operator
>>>>>>>>>>>>
>>>>>>>>>>>>> on the closed port.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It will be necessary to distinguish between closed port and
>>>>>>>>>>>>> inactive
>>>>>>>>>>>>> stream. After port is closed, stream may still be active and
>>>>>>>>>>>>> after
>>>>>>>>>>>>> port
>>>>>>>>>>>>> is
>>>>>>>>>>>>> open, stream may still be inactive (not yet ready).
>>>>>>>>>>>>>
>>>>>>>>>>>>> The more contributors participate in the discussion and
>>>>>>>>>>>>> implementation,
>>>>>>>>>>>>> the more solid the feature will be.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>> Vlad
>>>>>>>>>>>>>
>>>>>>>>>>>>> Отправлено с iPhone
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <
>>>>>>>>>>>>> pra...@datatorrent.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Generally a good idea. Care should be taken around fault
>>>>>>>>>>>>> tolerance
>>>>>>>>>>>>> and
>>>>>>>>>>>>>
>>>>>>>>>>>>> idempotency. Close stream would need to stop accepting new data
>>>>>>>>>>>>> but
>>>>>>>>>>>>>
>>>>>>>>>>>>> still
>>>>>>>>>>>>>> can't actually close all the streams and un-deploy operators
>>>>>>>>>>>>>> till
>>>>>>>>>>>>>> committed. Idempotency might require the close stream to take
>>>>>>>>>>>>>> effect
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> end of the window. What would it then mean for re-opening
>>>>>>>>>>>>>> streams
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> within
>>>>>>>>>>>>>
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> window? Also, looks like a larger undertaking, as Ram
>>>>>>>>>>>>>> suggested
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> would
>>>>>>>>>>>>>
>>>>>>>>>>>>> be
>>>>>>>>>>>>>
>>>>>>>>>>>>> good to understand the use cases and I also suggest that
>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>> folks
>>>>>>>>>>>>>> participate in the implementation effort to ensure that we are
>>>>>>>>>>>>>> able
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> address all the scenarios and minimize chances of regression
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>> behavior.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <
>>>>>>>>>>>>>> v.ro...@datatorrent.com
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> All,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Currently Apex assumes that an operator can emit on any
>>>>>>>>>>>>>> defined
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> output
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> port and all streams defined by a DAG are active. I'd like to
>>>>>>>>>>>>>>> propose
>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>> ability for an operator to open and close output ports. By
>>>>>>>>>>>>>>> default
>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>> ports defined by an operator will be open. In the case an
>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> reason decides that it will not emit tuples on the output
>>>>>>>>>>>>>>> port,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> close it. This will make the stream inactive and the
>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> master
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> undeploy the downstream (for that input stream) operators. If
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>> leads to
>>>>>>>>>>>>>> containers that don't have any active operators, those
>>>>>>>>>>>>>> containers
>>>>>>>>>>>>>> may
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> undeployed as well leading to better cluster resource
>>>>>>>>>>>>>> utilization
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> better Apex elasticity. Later, the operator may be in a state
>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> needs to emit tuples on the closed port. In this case, it
>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> re-open
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the port and wait till the stream becomes active again before
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> emitting
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> tuples on that port. Making inactive stream active again,
>>>>>>>>>>>>>> requires
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> application master to re-allocate containers and re-deploy
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> downstream
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> operators.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It should be also possible for an application designer to
>>>>>>>>>>>>>> mark
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> streams
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> inactive when an application starts. This will allow the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>> master
>>>>>>>>>>>>>> avoid reserving all containers when the application starts.
>>>>>>>>>>>>>> Later,
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> port
>>>>>>>>>>>>>> can be open and inactive stream become active.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Vlad
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>

Reply via email to