Hi Vlad,

you can avoid writing data files, if intermediate data can fit in memory
with APEXCORE-408. The api allows extending the DAG by adding streams and
operators to existing DAG. In the example you have provided user could
extend a new dag after aggregate operator by adding new output operator and
connecting it with aggregate.

- Tushar.


On Fri, Apr 7, 2017 at 6:22 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote:

> It is exactly the same use case with the exception that it is not
> necessary to write data to files. Consider 3 operators, an input operator,
> an aggregate operator and an output operator. When the application starts,
> the output port of the aggregate operator should be in the closed state,
> the stream between the second and the third would be inactive and the
> output operator does not need to be allocated. After the input operator
> process all data, it can close the output port and the input operator may
> be de-allocated. Once the aggregator receives EOS on it's input port, it
> should open the output port and start writing to it. At this point, the
> output operator needs to be deployed and the stream between the last two
> operators (aggregator and output) becomes active.
>
> In a real batch use case, it is preferable to have full application DAG to
> be statically defined and delegate to platform activation/de-activation of
> stages. It is also preferable not to write intermediate files to disk/HDFS,
> but instead pass data in-memory.
>
> Thank you,
>
> Vlad
>
>
> On 4/6/17 09:37, Thomas Weise wrote:
>
>> You would need to provide more specifics of the use case you are thinking
>> to address to make this a meaningful discussion.
>>
>> An example for APEXCORE-408 (based on real batch use case): I have two
>> stages, first stage produces a set of files that second stage needs as
>> input. Stage 1 operators to be released and stage 2 operators deployed
>> when
>> stage 2 starts. These can be independent operators, they don't need to be
>> connected through a stream.
>>
>> Thomas
>>
>>
>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <v.ro...@datatorrent.com>
>> wrote:
>>
>> It is not about a use case difference. My proposal and APEXCORE-408
>>> address the same use case - how to re-allocate resources for batch
>>> applications or applications where processing happens in stages. The
>>> difference between APEXCORE-408 and the proposal is shift in complexity
>>> from application logic to the platform. IMO, supporting batch
>>> applications
>>> using APEXCORE-408 will require more coding on the application side.
>>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>>
>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>
>>> I think this needs more input on a use case level. The ability to
>>>> dynamically alter the DAG internally will also address the resource
>>>> allocation for operators:
>>>>
>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>
>>>> It can be used to implement stages of a batch pipeline and is very
>>>> flexible
>>>> in general. Considering the likely implementation complexity for the
>>>> proposed feature I would like to understand what benefits it provides to
>>>> the user (use cases that cannot be addressed otherwise)?
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>>
>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <v.ro...@datatorrent.com>
>>>> wrote:
>>>>
>>>> Correct, a statefull downstream operator can only be undeployed at a
>>>>
>>>>> checkpoint window after it consumes all data emitted by upstream
>>>>> operator
>>>>> on the closed port.
>>>>>
>>>>> It will be necessary to distinguish between closed port and inactive
>>>>> stream. After port is closed, stream may still be active and after port
>>>>> is
>>>>> open, stream may still be inactive (not yet ready).
>>>>>
>>>>> The more contributors participate in the discussion and implementation,
>>>>> the more solid the feature will be.
>>>>>
>>>>> Thank you,
>>>>> Vlad
>>>>>
>>>>> Отправлено с iPhone
>>>>>
>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <pra...@datatorrent.com>
>>>>> wrote:
>>>>>
>>>>> Generally a good idea. Care should be taken around fault tolerance and
>>>>>> idempotency. Close stream would need to stop accepting new data but
>>>>>> still
>>>>>> can't actually close all the streams and un-deploy operators till
>>>>>> committed. Idempotency might require the close stream to take effect
>>>>>> at
>>>>>>
>>>>>> the
>>>>>
>>>>> end of the window. What would it then mean for re-opening streams
>>>>>> within
>>>>>>
>>>>>> a
>>>>>
>>>>> window? Also, looks like a larger undertaking, as Ram suggested would
>>>>>> be
>>>>>> good to understand the use cases and I also suggest that multiple
>>>>>> folks
>>>>>> participate in the implementation effort to ensure that we are able to
>>>>>> address all the scenarios and minimize chances of regression in
>>>>>> existing
>>>>>> behavior.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <v.ro...@datatorrent.com>
>>>>>> wrote:
>>>>>> All,
>>>>>>
>>>>>>> Currently Apex assumes that an operator can emit on any defined
>>>>>>> output
>>>>>>> port and all streams defined by a DAG are active. I'd like to propose
>>>>>>> an
>>>>>>> ability for an operator to open and close output ports. By default
>>>>>>> all
>>>>>>> ports defined by an operator will be open. In the case an operator
>>>>>>> for
>>>>>>>
>>>>>>> any
>>>>>> reason decides that it will not emit tuples on the output port, it may
>>>>>>
>>>>>>> close it. This will make the stream inactive and the application
>>>>>>> master
>>>>>>>
>>>>>>> may
>>>>>> undeploy the downstream (for that input stream) operators. If this
>>>>>> leads to
>>>>>> containers that don't have any active operators, those containers may
>>>>>> be
>>>>>>
>>>>>>> undeployed as well leading to better cluster resource utilization and
>>>>>>> better Apex elasticity. Later, the operator may be in a state where
>>>>>>> it
>>>>>>> needs to emit tuples on the closed port. In this case, it needs to
>>>>>>>
>>>>>>> re-open
>>>>>> the port and wait till the stream becomes active again before emitting
>>>>>>
>>>>>>> tuples on that port. Making inactive stream active again, requires
>>>>>>> the
>>>>>>> application master to re-allocate containers and re-deploy the
>>>>>>>
>>>>>>> downstream
>>>>>> operators.
>>>>>>
>>>>>>> It should be also possible for an application designer to mark
>>>>>>> streams
>>>>>>>
>>>>>>> as
>>>>>> inactive when an application starts. This will allow the application
>>>>>> master
>>>>>> avoid reserving all containers when the application starts. Later, the
>>>>>> port
>>>>>> can be open and inactive stream become active.
>>>>>>
>>>>>>> Thank you,
>>>>>>>
>>>>>>> Vlad
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>

Reply via email to