Depends on a use case. Consider join->aggregate->topN pipeline. My point was that even for reader->aggregator->writer there is a way to better handle resources.

Thank you,

Vlad

On 4/10/17 09:07, Thomas Weise wrote:
Yes, but source is not important for the resource allocation aspect because
it is a reader that does not hold a lot of resources. The big ticket items
are join and topN, and they need to be allocated at the same time if you
don't have a swap space.


On Mon, Apr 10, 2017 at 8:56 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote:

For the second pipeline, source can be de-allocated as soon as join gets
all data and join can be de-allocated as soon as topN gets all data. Note
that topN (and sink) does not need to be allocated before join starts
emitting data.

Thank you,

Vlad


On 4/10/17 08:48, Thomas Weise wrote:

The pipeline depends on the resource availability. It could be:

( source -> join -> writer ) - - -> ( reader -> topN -> sink)

or

(source -> join -> topN -> sink)

The second case does not allow you do deallocate join (join and topN are
active at the same time).


On Mon, Apr 10, 2017 at 8:37 AM, Vlad Rozov <v.ro...@datatorrent.com>
wrote:

It is important. The generic pipeline proposed is (... -> writer) --->
(reader -> join -> writer) ---> (reader -> ...), where reader->
aggregator
-> writer becomes a common pattern for a single stage processing.

Thank you,

Vlad


On 4/10/17 08:31, Thomas Weise wrote:

Where the data comes from isn't important for this discussion. The
scenario
is join -> topN

With intermediate files it is: ( join -> writer ) - - -> ( reader ->
topN
)


On Mon, Apr 10, 2017 at 8:26 AM, Vlad Rozov <v.ro...@datatorrent.com>
wrote:

In your example join is both consumer and producer, is not it? Where
does

it get data from? Join is not an input operator.

Thank you,

Vlad


On 4/10/17 08:13, Thomas Weise wrote:

In this example join/writer produces the data, reader/topN consumes.
You

cannot deallocate producer before all data has been drained. When
using
files, join/writer can be deallocated when all data was flushed to the
files and allocation of consumer can wait until that occurred, if the
space
isn't available to have both of them active at same time.

Overall it seems this is not a matter of activating/deactivating
streams
but operators.

Thomas



On Mon, Apr 10, 2017 at 8:05 AM, Vlad Rozov <v.ro...@datatorrent.com>
wrote:

With additional file readers/writers the pipeline of a single stage

becomes the 3 operator use case I described. With ability to
open/close
ports, platform can optimize it by re-allocating resources from
readers
to
writers.

Thank you,

Vlad


On 4/10/17 07:44, Thomas Weise wrote:

In streaming there is a stream (surprise), in a space constraint
batch

case, we can have additional file writers/readers between the
operators.

Modules can in fact be used to support pipeline reuse, but they must
be
added/removed dynamically to support stages with on-demand resource
allocation.

Thomas


On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <
v.ro...@datatorrent.com>
wrote:

Do you suggest that in a streaming use case join operator also pass
data

to downstream using files or that there are two different join

operators
one for streaming and one for batch? If not, it means that the join
operator needs to emit data to a separate file output operator, so
it
still
needs to read data from a temporary space before emitting, why not
to
emit
directly to topN in this case?

Is not pipeline reuse already supported by Apex modules?

Thank you,

Vlad


On 4/10/17 06:59, Thomas Weise wrote:

I don't think this fully covers the the scenario of limited
resources.

You

describe a case of 3 operators, but when you consider just 2
operators
that
both have to hold a large data set in memory, then the suggested
approach
won't work. Let's say the first operator is outer join and the
second
operator topN. Both are blocking and cannot emit before all input
is
seen.

To deallocate the outer join, all results need to be drained.
It's a
resource swap and you need a temporary space to hold the data.
Also,
if
the
requirement is to be able to recover and retry from results of
stage
one,
then you need a fault tolerant swap space. If the cluster does not
have
enough memory, then disk is a good option (SLA vs. memory
tradeoff).

I would also suggest to think beyond the single DAG scenario.
Often
users
need to define pipelines that are composed of multiple smaller
flows
(which
they may also want to reuse in multiple pipelines). APEXCORE-408
gives
you
an option to compose such flows within a single Apex application,
in
addition of covering the simplified use case that we discuss
there.

Thomas


On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <
v.ro...@datatorrent.com
wrote:

It is exactly the same use case with the exception that it is not

necessary to write data to files. Consider 3 operators, an input

operator,
an aggregate operator and an output operator. When the
application
starts,
the output port of the aggregate operator should be in the closed
state,
the stream between the second and the third would be inactive and
the
output operator does not need to be allocated. After the input
operator
process all data, it can close the output port and the input
operator
may
be de-allocated. Once the aggregator receives EOS on it's input
port,
it
should open the output port and start writing to it. At this
point,
the
output operator needs to be deployed and the stream between the
last
two
operators (aggregator and output) becomes active.

In a real batch use case, it is preferable to have full
application
DAG
to
be statically defined and delegate to platform
activation/de-activation
of
stages. It is also preferable not to write intermediate files to
disk/HDFS,
but instead pass data in-memory.

Thank you,

Vlad


On 4/6/17 09:37, Thomas Weise wrote:

You would need to provide more specifics of the use case you are
thinking

to address to make this a meaningful discussion.

An example for APEXCORE-408 (based on real batch use case): I
have
two
stages, first stage produces a set of files that second stage
needs
as
input. Stage 1 operators to be released and stage 2 operators
deployed
when
stage 2 starts. These can be independent operators, they don't
need
to
be
connected through a stream.

Thomas


On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <
v.ro...@datatorrent.com
wrote:

It is not about a use case difference. My proposal and
APEXCORE-408

address the same use case - how to re-allocate resources for
batch

applications or applications where processing happens in stages.

The
difference between APEXCORE-408 and the proposal is shift in
complexity
from application logic to the platform. IMO, supporting batch
applications
using APEXCORE-408 will require more coding on the application
side.

Thank you,

Vlad


On 4/5/17 21:57, Thomas Weise wrote:

I think this needs more input on a use case level. The ability
to

dynamically alter the DAG internally will also address the
resource

allocation for operators:

https://issues.apache.org/jira/browse/APEXCORE-408

It can be used to implement stages of a batch pipeline and is
very
flexible
in general. Considering the likely implementation complexity
for
the
proposed feature I would like to understand what benefits it
provides
to
the user (use cases that cannot be addressed otherwise)?

Thanks,
Thomas



On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <
v.ro...@datatorrent.com>
wrote:

Correct, a statefull downstream operator can only be
undeployed
at a

checkpoint window after it consumes all data emitted by
upstream

operator

on the closed port.
It will be necessary to distinguish between closed port and
inactive
stream. After port is closed, stream may still be active and
after
port
is
open, stream may still be inactive (not yet ready).

The more contributors participate in the discussion and
implementation,
the more solid the feature will be.

Thank you,
Vlad

Отправлено с iPhone

On Apr 1, 2017, at 11:03, Pramod Immaneni <
pra...@datatorrent.com>
wrote:

Generally a good idea. Care should be taken around fault
tolerance
and

idempotency. Close stream would need to stop accepting new
data
but

still

can't actually close all the streams and un-deploy operators
till
committed. Idempotency might require the close stream to
take
effect
at

the

end of the window. What would it then mean for re-opening
streams

within

a

window? Also, looks like a larger undertaking, as Ram
suggested

would

be

good to understand the use cases and I also suggest that

multiple
folks
participate in the implementation effort to ensure that we
are
able
to
address all the scenarios and minimize chances of regression
in
existing
behavior.

Thanks

On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <
v.ro...@datatorrent.com
wrote:
All,

Currently Apex assumes that an operator can emit on any
defined

output

port and all streams defined by a DAG are active. I'd like
to
propose
an
ability for an operator to open and close output ports. By
default
all
ports defined by an operator will be open. In the case an
operator
for

any

reason decides that it will not emit tuples on the output
port,

it

may

close it. This will make the stream inactive and the
application

master

may
undeploy the downstream (for that input stream) operators.
If

this

leads to
containers that don't have any active operators, those
containers
may
be

undeployed as well leading to better cluster resource
utilization

and

better Apex elasticity. Later, the operator may be in a
state
where
it
needs to emit tuples on the closed port. In this case, it
needs
to

re-open

the port and wait till the stream becomes active again
before

emitting

tuples on that port. Making inactive stream active again,
requires

the

application master to re-allocate containers and re-deploy
the

downstream

operators.

It should be also possible for an application designer to

mark

streams

as
inactive when an application starts. This will allow the

application

master
avoid reserving all containers when the application starts.
Later,
the
port
can be open and inactive stream become active.

Thank you,

Vlad








Reply via email to