Where the data comes from isn't important for this discussion. The scenario
is join -> topN

With intermediate files it is: ( join -> writer ) - - -> ( reader -> topN )


On Mon, Apr 10, 2017 at 8:26 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote:

> In your example join is both consumer and producer, is not it? Where does
> it get data from? Join is not an input operator.
>
> Thank you,
>
> Vlad
>
>
> On 4/10/17 08:13, Thomas Weise wrote:
>
>> In this example join/writer produces the data, reader/topN consumes. You
>> cannot deallocate producer before all data has been drained. When using
>> files, join/writer can be deallocated when all data was flushed to the
>> files and allocation of consumer can wait until that occurred, if the
>> space
>> isn't available to have both of them active at same time.
>>
>> Overall it seems this is not a matter of activating/deactivating streams
>> but operators.
>>
>> Thomas
>>
>>
>>
>> On Mon, Apr 10, 2017 at 8:05 AM, Vlad Rozov <v.ro...@datatorrent.com>
>> wrote:
>>
>> With additional file readers/writers the pipeline of a single stage
>>> becomes the 3 operator use case I described. With ability to open/close
>>> ports, platform can optimize it by re-allocating resources from readers
>>> to
>>> writers.
>>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>>
>>> On 4/10/17 07:44, Thomas Weise wrote:
>>>
>>> In streaming there is a stream (surprise), in a space constraint batch
>>>> case, we can have additional file writers/readers between the operators.
>>>>
>>>> Modules can in fact be used to support pipeline reuse, but they must be
>>>> added/removed dynamically to support stages with on-demand resource
>>>> allocation.
>>>>
>>>> Thomas
>>>>
>>>>
>>>> On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <v.ro...@datatorrent.com>
>>>> wrote:
>>>>
>>>> Do you suggest that in a streaming use case join operator also pass data
>>>>
>>>>> to downstream using files or that there are two different join
>>>>> operators
>>>>> one for streaming and one for batch? If not, it means that the join
>>>>> operator needs to emit data to a separate file output operator, so it
>>>>> still
>>>>> needs to read data from a temporary space before emitting, why not to
>>>>> emit
>>>>> directly to topN in this case?
>>>>>
>>>>> Is not pipeline reuse already supported by Apex modules?
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Vlad
>>>>>
>>>>>
>>>>> On 4/10/17 06:59, Thomas Weise wrote:
>>>>>
>>>>> I don't think this fully covers the the scenario of limited resources.
>>>>>
>>>>>> You
>>>>>> describe a case of 3 operators, but when you consider just 2 operators
>>>>>> that
>>>>>> both have to hold a large data set in memory, then the suggested
>>>>>> approach
>>>>>> won't work. Let's say the first operator is outer join and the second
>>>>>> operator topN. Both are blocking and cannot emit before all input is
>>>>>> seen.
>>>>>>
>>>>>> To deallocate the outer join, all results need to be drained. It's a
>>>>>> resource swap and you need a temporary space to hold the data. Also,
>>>>>> if
>>>>>> the
>>>>>> requirement is to be able to recover and retry from results of stage
>>>>>> one,
>>>>>> then you need a fault tolerant swap space. If the cluster does not
>>>>>> have
>>>>>> enough memory, then disk is a good option (SLA vs. memory tradeoff).
>>>>>>
>>>>>> I would also suggest to think beyond the single DAG scenario. Often
>>>>>> users
>>>>>> need to define pipelines that are composed of multiple smaller flows
>>>>>> (which
>>>>>> they may also want to reuse in multiple pipelines). APEXCORE-408 gives
>>>>>> you
>>>>>> an option to compose such flows within a single Apex application, in
>>>>>> addition of covering the simplified use case that we discuss there.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <v.ro...@datatorrent.com>
>>>>>> wrote:
>>>>>>
>>>>>> It is exactly the same use case with the exception that it is not
>>>>>>
>>>>>> necessary to write data to files. Consider 3 operators, an input
>>>>>>> operator,
>>>>>>> an aggregate operator and an output operator. When the application
>>>>>>> starts,
>>>>>>> the output port of the aggregate operator should be in the closed
>>>>>>> state,
>>>>>>> the stream between the second and the third would be inactive and the
>>>>>>> output operator does not need to be allocated. After the input
>>>>>>> operator
>>>>>>> process all data, it can close the output port and the input operator
>>>>>>> may
>>>>>>> be de-allocated. Once the aggregator receives EOS on it's input port,
>>>>>>> it
>>>>>>> should open the output port and start writing to it. At this point,
>>>>>>> the
>>>>>>> output operator needs to be deployed and the stream between the last
>>>>>>> two
>>>>>>> operators (aggregator and output) becomes active.
>>>>>>>
>>>>>>> In a real batch use case, it is preferable to have full application
>>>>>>> DAG
>>>>>>> to
>>>>>>> be statically defined and delegate to platform
>>>>>>> activation/de-activation
>>>>>>> of
>>>>>>> stages. It is also preferable not to write intermediate files to
>>>>>>> disk/HDFS,
>>>>>>> but instead pass data in-memory.
>>>>>>>
>>>>>>> Thank you,
>>>>>>>
>>>>>>> Vlad
>>>>>>>
>>>>>>>
>>>>>>> On 4/6/17 09:37, Thomas Weise wrote:
>>>>>>>
>>>>>>> You would need to provide more specifics of the use case you are
>>>>>>> thinking
>>>>>>>
>>>>>>> to address to make this a meaningful discussion.
>>>>>>>>
>>>>>>>> An example for APEXCORE-408 (based on real batch use case): I have
>>>>>>>> two
>>>>>>>> stages, first stage produces a set of files that second stage needs
>>>>>>>> as
>>>>>>>> input. Stage 1 operators to be released and stage 2 operators
>>>>>>>> deployed
>>>>>>>> when
>>>>>>>> stage 2 starts. These can be independent operators, they don't need
>>>>>>>> to
>>>>>>>> be
>>>>>>>> connected through a stream.
>>>>>>>>
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <v.ro...@datatorrent.com
>>>>>>>> >
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> It is not about a use case difference. My proposal and APEXCORE-408
>>>>>>>>
>>>>>>>> address the same use case - how to re-allocate resources for batch
>>>>>>>>
>>>>>>>>> applications or applications where processing happens in stages.
>>>>>>>>> The
>>>>>>>>> difference between APEXCORE-408 and the proposal is shift in
>>>>>>>>> complexity
>>>>>>>>> from application logic to the platform. IMO, supporting batch
>>>>>>>>> applications
>>>>>>>>> using APEXCORE-408 will require more coding on the application
>>>>>>>>> side.
>>>>>>>>>
>>>>>>>>> Thank you,
>>>>>>>>>
>>>>>>>>> Vlad
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>>>>>>>
>>>>>>>>> I think this needs more input on a use case level. The ability to
>>>>>>>>>
>>>>>>>>> dynamically alter the DAG internally will also address the resource
>>>>>>>>>
>>>>>>>>>> allocation for operators:
>>>>>>>>>>
>>>>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>>>>>>>
>>>>>>>>>> It can be used to implement stages of a batch pipeline and is very
>>>>>>>>>> flexible
>>>>>>>>>> in general. Considering the likely implementation complexity for
>>>>>>>>>> the
>>>>>>>>>> proposed feature I would like to understand what benefits it
>>>>>>>>>> provides
>>>>>>>>>> to
>>>>>>>>>> the user (use cases that cannot be addressed otherwise)?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Thomas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <
>>>>>>>>>> v.ro...@datatorrent.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Correct, a statefull downstream operator can only be undeployed
>>>>>>>>>> at a
>>>>>>>>>>
>>>>>>>>>> checkpoint window after it consumes all data emitted by upstream
>>>>>>>>>>
>>>>>>>>>> operator
>>>>>>>>>>> on the closed port.
>>>>>>>>>>>
>>>>>>>>>>> It will be necessary to distinguish between closed port and
>>>>>>>>>>> inactive
>>>>>>>>>>> stream. After port is closed, stream may still be active and
>>>>>>>>>>> after
>>>>>>>>>>> port
>>>>>>>>>>> is
>>>>>>>>>>> open, stream may still be inactive (not yet ready).
>>>>>>>>>>>
>>>>>>>>>>> The more contributors participate in the discussion and
>>>>>>>>>>> implementation,
>>>>>>>>>>> the more solid the feature will be.
>>>>>>>>>>>
>>>>>>>>>>> Thank you,
>>>>>>>>>>> Vlad
>>>>>>>>>>>
>>>>>>>>>>> Отправлено с iPhone
>>>>>>>>>>>
>>>>>>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <
>>>>>>>>>>> pra...@datatorrent.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Generally a good idea. Care should be taken around fault
>>>>>>>>>>> tolerance
>>>>>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>>> idempotency. Close stream would need to stop accepting new data
>>>>>>>>>>> but
>>>>>>>>>>>
>>>>>>>>>>>> still
>>>>>>>>>>>> can't actually close all the streams and un-deploy operators
>>>>>>>>>>>> till
>>>>>>>>>>>> committed. Idempotency might require the close stream to take
>>>>>>>>>>>> effect
>>>>>>>>>>>> at
>>>>>>>>>>>>
>>>>>>>>>>>> the
>>>>>>>>>>>>
>>>>>>>>>>>> end of the window. What would it then mean for re-opening
>>>>>>>>>>>> streams
>>>>>>>>>>>>
>>>>>>>>>>> within
>>>>>>>>>>>
>>>>>>>>>>>> a
>>>>>>>>>>>>
>>>>>>>>>>>> window? Also, looks like a larger undertaking, as Ram suggested
>>>>>>>>>>>>
>>>>>>>>>>> would
>>>>>>>>>>>
>>>>>>>>>>> be
>>>>>>>>>>>
>>>>>>>>>>>> good to understand the use cases and I also suggest that
>>>>>>>>>>>> multiple
>>>>>>>>>>>> folks
>>>>>>>>>>>> participate in the implementation effort to ensure that we are
>>>>>>>>>>>> able
>>>>>>>>>>>> to
>>>>>>>>>>>> address all the scenarios and minimize chances of regression in
>>>>>>>>>>>> existing
>>>>>>>>>>>> behavior.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <
>>>>>>>>>>>> v.ro...@datatorrent.com
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> All,
>>>>>>>>>>>>
>>>>>>>>>>>> Currently Apex assumes that an operator can emit on any defined
>>>>>>>>>>>>
>>>>>>>>>>>> output
>>>>>>>>>>>>> port and all streams defined by a DAG are active. I'd like to
>>>>>>>>>>>>> propose
>>>>>>>>>>>>> an
>>>>>>>>>>>>> ability for an operator to open and close output ports. By
>>>>>>>>>>>>> default
>>>>>>>>>>>>> all
>>>>>>>>>>>>> ports defined by an operator will be open. In the case an
>>>>>>>>>>>>> operator
>>>>>>>>>>>>> for
>>>>>>>>>>>>>
>>>>>>>>>>>>> any
>>>>>>>>>>>>>
>>>>>>>>>>>>> reason decides that it will not emit tuples on the output port,
>>>>>>>>>>>>>
>>>>>>>>>>>> it
>>>>>>>>>>>> may
>>>>>>>>>>>>
>>>>>>>>>>>> close it. This will make the stream inactive and the application
>>>>>>>>>>>>
>>>>>>>>>>>> master
>>>>>>>>>>>>>
>>>>>>>>>>>>> may
>>>>>>>>>>>>>
>>>>>>>>>>>>> undeploy the downstream (for that input stream) operators. If
>>>>>>>>>>>>>
>>>>>>>>>>>> this
>>>>>>>>>>>> leads to
>>>>>>>>>>>> containers that don't have any active operators, those
>>>>>>>>>>>> containers
>>>>>>>>>>>> may
>>>>>>>>>>>> be
>>>>>>>>>>>>
>>>>>>>>>>>> undeployed as well leading to better cluster resource
>>>>>>>>>>>> utilization
>>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>>>>>>>>> better Apex elasticity. Later, the operator may be in a state
>>>>>>>>>>>>> where
>>>>>>>>>>>>> it
>>>>>>>>>>>>> needs to emit tuples on the closed port. In this case, it needs
>>>>>>>>>>>>> to
>>>>>>>>>>>>>
>>>>>>>>>>>>> re-open
>>>>>>>>>>>>>
>>>>>>>>>>>>> the port and wait till the stream becomes active again before
>>>>>>>>>>>>>
>>>>>>>>>>>> emitting
>>>>>>>>>>>>
>>>>>>>>>>>> tuples on that port. Making inactive stream active again,
>>>>>>>>>>>> requires
>>>>>>>>>>>>
>>>>>>>>>>>> the
>>>>>>>>>>>>> application master to re-allocate containers and re-deploy the
>>>>>>>>>>>>>
>>>>>>>>>>>>> downstream
>>>>>>>>>>>>>
>>>>>>>>>>>>> operators.
>>>>>>>>>>>>>
>>>>>>>>>>>> It should be also possible for an application designer to mark
>>>>>>>>>>>>
>>>>>>>>>>>> streams
>>>>>>>>>>>>>
>>>>>>>>>>>>> as
>>>>>>>>>>>>>
>>>>>>>>>>>>> inactive when an application starts. This will allow the
>>>>>>>>>>>>>
>>>>>>>>>>>> application
>>>>>>>>>>>> master
>>>>>>>>>>>> avoid reserving all containers when the application starts.
>>>>>>>>>>>> Later,
>>>>>>>>>>>> the
>>>>>>>>>>>> port
>>>>>>>>>>>> can be open and inactive stream become active.
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>
>>>>>>>>>>>> Vlad
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>

Reply via email to