It may be quite extreme use case, but suppose that operator X writes to a distributed in-memory database and operator Z reads from it. Z is not directly connected to X. In such cases it may be necessary to let an application request container affinity (deploying to the same JVM) for operators X and Z as writes may be cached and reads from the same JVM will be potentially faster.

Technically the same may be applied to thread local on NUMA boxes, as NUMA aware deployment of containers is not supported.

Vlad

On 1/23/16 18:01, Yogi Devendra wrote:
@Isha

In my opinion:
THREAD_LOCAL, CONTAINER_LOCAL on stream is a special case of generic rules
for Operator X and Operator Y.

We can say that, THREAD_LOCAL, CONTAINER_LOCAL would be applicable only if
operator X and Y are connected by stream. But, way to express this should
be similar to other rules for affinity.

~ Yogi

On 24 January 2016 at 03:49, Isha Arkatkar <[email protected]> wrote:

Hey Chinmay,

    I certainly agree on common set of rules for configuring affinity! Well
put by a concrete example. :)
    Only thing I would like to point is: affinity of operators should not
cover thread and container locality. Since this is only applicable when
operators are connected by stream. So, it makes sense to have it on Stream
rather than in common configuration.

   And yes, DAG.validate should only check for REQUIRED or STRICT policy. We
can agree on one of the terminologies STRICT/RELAXED or REQUIRED/PREFERRED.

Thanks!
Isha


On Fri, Jan 22, 2016 at 8:38 PM, Chinmay Kolhatkar <
[email protected]>
wrote:

Hi Isha, Bhupesh,

When I suggested singe affinity rule, I was mainly talking about "how to"
of configuration and not of implementation.

I see locality is in a way suggesting an affinity of operators. They're
close terminologies.
By configuring a locality on stream, we're also, in a way, defining
affinity of operators.

Until now, only locality was there and hence was straight forward in
configuration for user.
Tomorrow, when anti-affinity configuration comes up, one might get
confused
on how to best use both locality & anti-affinity.
Hence suggested to make both (locality/affinity & anti-affinity) as a
part
of single configuration.

Suggestion is to have a more commonly adopted configuration which admins
and developer's are familiar with.
Again referring to vShere Hypervisor's affinity rules. I think they have
a
single configuration which does both.

Having said that, here is a quick suggestion on how both can be achieved
in
a single configuration:

CATEGORY    TYPE           POLICY              ENTITIES
Affinity             THREAD      REQUIRED        O1, O2      //Meaning
Operator1 & Operator2 should be thread local
Affinity             NODE          PREFERRED     O3, O4      //Meaning
O3 &
O4 are preferred to be in node local
AntiAffinity       NODE          REQUIRED        O1, O4      //Meaning
O1 &
O4 should not be on same node.
AntiAffinity       RACK          PREFERRED     O2, O4      //Meaning O2 &
O4 are preferred not to be on same rack.


Linux setting affinity of CPUs for threads is another way of
configuration
we can take a look at.

Learning from these commonly adopted configuration pattern, we should
come
up with best configuration suitable for distributed environment.
Idea here is to not have our own configuration and give something new to
the users. Otherwise such an important concept might quickly get lost.

Regarding the DAG.validate I think we would need to add some new stuff to
take care of anti-affinity.
Plus, anti-affinity/affinity should be validated at DAG.validate only for
the ones which are required.
For preferred policies, validation in logical plan might be a early
check.
Thanks,
Chinmay.



On Sat, Jan 23, 2016 at 3:15 AM, Isha Arkatkar <[email protected]>
wrote:

Hi,

   Thanks for inputs! I like the idea of having single set of
(anti?)affinity rules at dag context level.
   There could still be conflicts based on Locality for Streams or Node
locality attribute set for operators. But as Sandeep suggested, I also
think Dag.validate should fail in case of contradicting constraints.

  We currently do not have 'affinity' among non stream operators, as
Bhupesh
pointed out. It is somewhat achievable by requesting node locality for
operators (Assuming node requests work as expected). But should we
consider
adding affinity specifications support as well along with
anti-affinity?
  Regarding specification of attributes from dt-site.xml. We can go with
Json like string or even xml representation for complex objects. What
is
our current behavior for setting Java object properties through XML? We
can
follow the same for this as well.

   As for precedence or ability to satisfy constraints: Right now in
normal
scenario, if resources are not available for allocating containers, we
keep
sending to request till all are obtained. Likewise, in case of strict
anti-affinity policy, we should keep the application in ACCEPTED state
till
the anti-affinity constraint is satisfied. For relaxed policy, we can
decide timeout for relaxing the anti-affinity rule. Please note this
applies only when we have non-contradicting rules.

Thanks!
Isha

On Fri, Jan 22, 2016 at 5:30 AM, Bhupesh Chawda <
[email protected]
wrote:

I agree on having a single set of rules for the affinity as well as
anti-affinity of operators / partitions on containers.

However, I noted the following points:

    1. AFAIK, we do not support affinity (locality) in a general
sense.
The
    affinity is only for a stream, not for *any* two operators. So, we
    should also look at the general case and see how it can be
supported,
if
    there are valid use cases.
    2. Coming to anti-affinity, we cannot type cast it as a type of
affinity
    rule. Saying "two operators must be on the same container" is very
    different from saying "these two operators must not be on the same
    container". In this case, the second one is a much more relaxed
rule
as
    compared to the first one.
    3. Once we have this generic set of rules, there must be a
    "satisfiability" test run before requesting YARN for containers.
If
the
    request is not satisfiable, then there is no point asking YARN to
allocate
    containers in this manner. In case it is not satisfiable, we must
also
have
    a default order in which the rules can be "relaxed" and the
request
be
made
    satisfiable. For example, some very strict rules may be ignored,
or
made
    less constraining ( for example "on the same container" => "on the
same
    node").

Thanks.

-Bhupesh

On Fri, Jan 22, 2016 at 2:54 PM, Aniruddha Thombare <
[email protected]> wrote:

+1 On Chinmay's suggestion about having single set of affinity
rules.

Thanks,


Aniruddha

On Fri, Jan 22, 2016 at 1:57 PM, Sandeep Deshmukh <
[email protected]
wrote:

Between 2 operators, if one configures thread/container local
and
anti-affinity as well, which one will take affect?
The DAG validation step should error out in this case.

+1 on suggestion by  Chinmay to name it  "Affinity Rules" than
anti-affinity. We are just extending our container allocation
scheme
to
support containers not to be allocated together.

Regards,
Sandeep

On Fri, Jan 22, 2016 at 1:43 PM, Chinmay Kolhatkar <
[email protected]>
wrote:

Hi Isha,

Couple of points:
1. About the interface to configuring anti-affinity, as per
suggestion
above there are 2 different way to configure locality and
anti-affinity:
i.e. dag.setAttribute - for anti-affinity  &
  dag.addStream(...).setLocality for locality.

Between 2 operators, if one configures thread/container local
and
anti-affinity as well, which one will take affect?

2. Consider there could be such confusion as above, would it
make
sense
to
have a single API which takes care of both anti-affinity and
locality.
This
way, one is configurable.

3. This point is coming from how VM affinity is configured in
vSphere.
The VMs are configured affinity are called as "affinity rules"
and
not
"anti-affinity rules". Ultimately idea is to allocate
processing
to
nodes.
Via "VM-VM affinity rules", anti-affinity is also configured.
But
there
is
a single set of rule definition for both affinity (similar to
locality
in
our case) and anti-affinity.
Would it be a better approach for configuring locality rules
and
anti-affinity rules in a single rule and call it "affinity
rule".
Thanks,
Chinmay.


On Fri, Jan 22, 2016 at 12:24 PM, Yogi Devendra <
[email protected]
wrote:

@Isha

I understand that anti-affinity across application is not
straight-forward.
It would be OK even if we do not have it in iteration 1.

But, for attributes syntax; I still think that Java object
should
be
avoided as they will be hard to configure from dt-site.xml or
other
config
files.
Other suggestion for this could be JSON representation of
String
array:
["O2", "O3"].  (If operator names has some special characters
like
"
or [
or , those will be escaped in the JSON representation.)
Not sure if others agree on this; but attribute syntax should
be
finalized
in iteration 1 to avoid backward compatibility issues later.

~ Yogi

On 22 January 2016 at 00:43, Thomas Weise <
[email protected]>
wrote:
Node based requests is the best approach - if it works :-)

Blacklisting will require to allocate the containers
sequentially.
It
will
work, but slow down application startup, especially for
larger
topologies.
On Thu, Jan 21, 2016 at 10:42 AM, Isha Arkatkar <
[email protected]>
wrote:

Hi,

   Should we consider the node based requests if it works
with
Capacity
Scheduler or avoid 2b approach altogether? I checked that
node
requests
do
not work with fair scheduler on CDH cluster. Yarn does
not
return
any
container if hostname is given in the container request.
I
am
trying
to
setup a small virtual hortonworks cluster to check the
this
behavior
on
that.
YARN-2027 <
https://issues.apache.org/jira/browse/YARN-2027
mentioned
that
container requests are not honored in capacity scheduler
too.
But I
am
not
sure if it is because of distro dependent issue. Please
share
insights.
@Vlad, Adding support for regular expression sounds good.
We
could
translate to list of operator names internally based on
regex.
@Yogi,  I went with a list of strings for attribute
because
"O2,
O3"
could
be a valid single operator name too :)
I am not sure of ways to implement anti-affinity across
application.
Though
something to consider for later iteration.

Thanks,
Isha

On Wed, Jan 20, 2016 at 8:59 PM, Thomas Weise <
[email protected]>
wrote:

https://issues.apache.org/jira/browse/SLIDER-82


On Wed, Jan 20, 2016 at 8:56 PM, Thomas Weise <
[email protected]>
wrote:

The point was that containers are taken away from
other
apps
that
may
have
to discard work etc. It's not good style to claim
resources
and
not
use
them eventually :-)

For this feature it is necessary to look at the
scheduler
capabilities/semantics and limitations. For example,
don't
bet
exclusively
on node requests if the goal is for it to work with
FairScheduler.
Also look at Slider, which just recently added
support
for
anti-affinity
(using node requests). When you run it on the CDH
cluster,
it
probably
won't work...


On Wed, Jan 20, 2016 at 3:19 PM, Pramod Immaneni <
[email protected]
wrote:

Once released won't the containers be available
again
in
the
pool.
This
would only be optional and not mandatory.

Thanks

On Tue, Jan 19, 2016 at 2:02 PM, Thomas Weise <
[email protected]
wrote:

How about also supporting a minor variation of it
as
an
option
where it greedily gets the total number of
containers
and
discards
ones
it
can't use and repeats the process for the
remaining
till
everything
has
been allocated.

This is problematic as with resource preemption
these
containers
will
be
potentially taken away from other applications and
then
thrown
away.



Also does it make sense to support anti-cluster
affinity?
Thanks

On Tue, Jan 19, 2016 at 1:21 PM, Isha Arkatkar <
[email protected]>
wrote:

Hi all,

    We want add support for Anti-affinity in
Apex
to
allow
applications
to
launch specific physical operators on
different
nodes(APEXCORE-10
<
https://issues.apache.org/jira/browse/APEXCORE-10
).
Want
to
request
your
suggestions/ideas for the same!

   The reasons for using anti-affinity in
operators
could
be:
to
ensure
reliability, for performance reasons (such as
application
may
not
want
2
i/o intensive operators to land on the same
node
to
improve
performance)
or
for some application specific constraints(for
example,
2
partitions
cannot
be run on the same node since they use same
port
number).
This
is
the
general rationale for adding Anti-affinity
support.
Since, Yarn does not support anti-affinity yet
(YARN-1042
<
https://issues.apache.org/jira/browse/YARN-1042
),
we
need
to
implement
the logic in AM. Wanted to get your views on
following
aspects
for
this
implementation:

*1. How to specify anti-affinity for physical
operators/partitions
in
application:*
     One way for this is to have an attribute
for
setting
anti-affinity
at
the logical operator context. And an operator
can
set
this
attribute
with
list of operator names which should not be
collocated.
      Consider dag with 3 operators:
      TestOperator o1 = dag.addOperator("O1",
new
TestOperator());
      TestOperator o2 = dag.addOperator("O2",
new
TestOperator());
      TestOperator o3 = dag.addOperator("O3",
new
TestOperator());
  To set anti-affinity for O1 operator:
     dag.setAttribute(o1,
OperatorContext.ANTI_AFFINITY,
new
ArrayList<String>(Arrays.asList("O2", "O3")));
      This would mean O1 should not be
allocated
on
nodes
containing
operators O2 and O3. This applies to all
allocated
partitions
of
O1,
O2,
O3.

    Also, if same operator name is part of
anti-affinity
list,
it
means
partitions of the operator should not be
allocated
on
the
same
node.
example:
     dag.setAttribute(o2,
OperatorContext.ANTI_AFFINITY,
new
ArrayList<String>(Arrays.asList("O2")));
     This indicates anti-affinity between all
partitions
of
O2.
i.e.
all
partitions of O2 should be launched on
different
nodes.
    Based on the anti-affinity attribute
specified
for
logical
operator,
during physical plan creation, we can add this
list
to
each
PTContainer.
This in turn will be available for Stram for
sending
container
requests
accordingly.

    Please suggest if there is a better way to
express
this
intent.
*2. How to implement anti-affinity in AM*
    There are 2 ways we can implement this:
   * a. Blacklisting of nodes: *We can group
the
physical
container
requests
based on anti-affinity requirements and send
allocation
requests
for
containers in groups. After first group is
done,
blacklist
the
nodes
before
sending second group of container requests.
This
will
ensure
that
the
containers with anti-affinity requirements
will
be
allocated
on
different
nodes.
*   b. Node specific container request:
*Explore
and
create
a
map
of
nodes
present in the cluster and send allocation
request
for
container
on
a
specific node, honoring anti-affinity. There
are
couple
of
open
Yarn
Jiras
for node specific container requests:
YARN-1412
<
https://issues.apache.org/jira/browse/YARN-1412
,
YARN-2027
<
https://issues.apache.org/jira/browse/YARN-2027
.
So,
need
to
check
if
this is a plausible approach.

*3. Strict Vs Relaxed anti-affinity*
   Depending on cluster resources availability,
it
may
not
be
possible
to
honor all anti-affinity requirements
specified.
*Strict Anti-affinity:* AM will keep trying to
allocate
containers
as
per
anti-affinity requirements indefinitely. This
behavior
will
be
similar
to
how an application shows in ACCEPTED state,
till
resources
are
available
to
launch in cluster.
*Relaxed Anti-affinity:* AM will drop the
anti-affinity
constraint
after
a
certain timeout.

We need a way to set this attribute through
application.
(Either
in
operator context or in DAGContext for
application
wide
setting.)
*4. How do we unit test this feature*
   We could use Mockito for mocking Yarn
behaviors
and
test
only
AM
implementation, since it may not be easy to
simulate
some
scenarios
manually in cluster. Please suggest if there
are
better
ways
to
test
this.
Please suggest improvements or any other ideas
on
all
of
the
above.
Thanks!
Isha

P.S. Sorry for long email. Please let me know
if I
should
start
separate
threads for any of the above points.



Reply via email to