We do get the list of nodes and available resources initially at the beginning 
of the AM.

Look at line 693 in StramAppMasterService.java class 
https://github.com/apache/incubator-apex-core/blob/devel-3/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java

Thanks
- Gaurav

> On Jan 19, 2016, at 2:19 PM, Isha Arkatkar <[email protected]> wrote:
> 
> Hi Gaurav,
> 
>   I think Pramod agreed with approach 2a.
>   For 2b approach, I think initially we will not have set of nodes
> available in cluster. So we need to try greedy approach to get containers
> on different nodes to get a mapping. Once we have mapping though, we can
> send request on specific nodes. I think 2b can be more applicable for
> container re-allocation scenario, if it node locality request is honored by
> Yarn. I can test this out.
> 
> Pramod, could you please elaborate on anti-cluster affinity?
> 
> Thanks,
> Isha
> 
> On Tue, Jan 19, 2016 at 2:14 PM, Thomas Weise <[email protected]>
> wrote:
> 
>> Gaurav,
>> 
>> Does request for specific node work on FairScheduler now?
>> 
>> 
>> 
>> On Tue, Jan 19, 2016 at 2:06 PM, Gaurav Gupta <[email protected]>
>> wrote:
>> 
>>> I agree with Pramod that we should go with 2b and we are already doing
>>> node locality so you can use that feature.
>>> Regarding 3, do we need to support relaxed anti_affinity. Anti_affinity
>>> will mostly be used where user wants such segregation of operators on
>>> different nodes for his/her App.
>>> 
>>> Thanks
>>> - Gaurav
>>> 
>>>> On Jan 19, 2016, at 1:57 PM, Pramod Immaneni <[email protected]>
>>> wrote:
>>>> 
>>>> Sorry I meant distro agnostic (without the not) in the first sentence.
>>>> 
>>>> On Tue, Jan 19, 2016 at 1:57 PM, Pramod Immaneni <
>> [email protected]
>>>> 
>>>> wrote:
>>>> 
>>>>> Isha this sounds great. 2 a. sounds like a good approach that is not
>>>>> distro agnostic. How about also supporting a minor variation of it as
>> an
>>>>> option where it greedily gets the total number of containers and
>>> discards
>>>>> ones it can't use and repeats the process for the remaining till
>>> everything
>>>>> has been allocated. Also does it make sense to support anti-cluster
>>>>> affinity?
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> On Tue, Jan 19, 2016 at 1:21 PM, Isha Arkatkar <[email protected]>
>>>>> wrote:
>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>>  We want add support for Anti-affinity in Apex to allow applications
>>> to
>>>>>> launch specific physical operators on different nodes(APEXCORE-10
>>>>>> <https://issues.apache.org/jira/browse/APEXCORE-10>). Want to
>> request
>>>>>> your
>>>>>> suggestions/ideas for the same!
>>>>>> 
>>>>>> The reasons for using anti-affinity in operators could be: to ensure
>>>>>> reliability, for performance reasons (such as application may not
>> want
>>> 2
>>>>>> i/o intensive operators to land on the same node to improve
>>> performance)
>>>>>> or
>>>>>> for some application specific constraints(for example,  2 partitions
>>>>>> cannot
>>>>>> be run on the same node since they use same port number). This is the
>>>>>> general rationale for adding Anti-affinity support.
>>>>>> 
>>>>>> Since, Yarn does not support anti-affinity yet (YARN-1042
>>>>>> <https://issues.apache.org/jira/browse/YARN-1042>), we need to
>>> implement
>>>>>> the logic in AM. Wanted to get your views on following aspects for
>> this
>>>>>> implementation:
>>>>>> 
>>>>>> *1. How to specify anti-affinity for physical operators/partitions in
>>>>>> application:*
>>>>>>   One way for this is to have an attribute for setting anti-affinity
>>> at
>>>>>> the logical operator context. And an operator can set this attribute
>>> with
>>>>>> list of operator names which should not be collocated.
>>>>>>    Consider dag with 3 operators:
>>>>>>    TestOperator o1 = dag.addOperator("O1", new TestOperator());
>>>>>>    TestOperator o2 = dag.addOperator("O2", new TestOperator());
>>>>>>    TestOperator o3 = dag.addOperator("O3", new TestOperator());
>>>>>> 
>>>>>> To set anti-affinity for O1 operator:
>>>>>>   dag.setAttribute(o1, OperatorContext.ANTI_AFFINITY, new
>>>>>> ArrayList<String>(Arrays.asList("O2", "O3")));
>>>>>>    This would mean O1 should not be allocated on nodes containing
>>>>>> operators O2 and O3. This applies to all allocated partitions of O1,
>>> O2,
>>>>>> O3.
>>>>>> 
>>>>>>  Also, if same operator name is part of anti-affinity list, it means
>>>>>> partitions of the operator should not be allocated on the same node.
>>>>>> example:
>>>>>>   dag.setAttribute(o2, OperatorContext.ANTI_AFFINITY, new
>>>>>> ArrayList<String>(Arrays.asList("O2")));
>>>>>>   This indicates anti-affinity between all partitions of O2. i.e.
>> all
>>>>>> partitions of O2 should be launched on different nodes.
>>>>>> 
>>>>>>  Based on the anti-affinity attribute specified for logical
>> operator,
>>>>>> during physical plan creation, we can add this list to each
>>> PTContainer.
>>>>>> This in turn will be available for Stram for sending container
>> requests
>>>>>> accordingly.
>>>>>> 
>>>>>>  Please suggest if there is a better way to express this intent.
>>>>>> 
>>>>>> *2. How to implement anti-affinity in AM*
>>>>>>  There are 2 ways we can implement this:
>>>>>> * a. Blacklisting of nodes: *We can group the physical container
>>>>>> requests
>>>>>> based on anti-affinity requirements and send allocation requests for
>>>>>> containers in groups. After first group is done, blacklist the nodes
>>>>>> before
>>>>>> sending second group of container requests. This will ensure that the
>>>>>> containers with anti-affinity requirements  will be allocated on
>>> different
>>>>>> nodes.
>>>>>> *   b. Node specific container request: *Explore and create a map of
>>> nodes
>>>>>> present in the cluster and send allocation request for container on a
>>>>>> specific node, honoring anti-affinity. There are couple of open Yarn
>>> Jiras
>>>>>> for node specific container requests: YARN-1412
>>>>>> <https://issues.apache.org/jira/browse/YARN-1412>, YARN-2027
>>>>>> <https://issues.apache.org/jira/browse/YARN-2027>. So, need to check
>>> if
>>>>>> this is a plausible approach.
>>>>>> 
>>>>>> *3. Strict Vs Relaxed anti-affinity*
>>>>>> Depending on cluster resources availability, it may not be possible
>> to
>>>>>> honor all anti-affinity requirements specified.
>>>>>> *Strict Anti-affinity:* AM will keep trying to allocate containers as
>>> per
>>>>>> anti-affinity requirements indefinitely. This behavior will be
>> similar
>>> to
>>>>>> how an application shows in ACCEPTED state, till resources are
>>> available
>>>>>> to
>>>>>> launch in cluster.
>>>>>> *Relaxed Anti-affinity:* AM will drop the anti-affinity constraint
>>> after a
>>>>>> certain timeout.
>>>>>> 
>>>>>> We need a way to set this attribute through application. (Either in
>>>>>> operator context or in DAGContext for application wide setting.)
>>>>>> 
>>>>>> *4. How do we unit test this feature*
>>>>>> We could use Mockito for mocking Yarn behaviors and test only AM
>>>>>> implementation, since it may not be easy to simulate some scenarios
>>>>>> manually in cluster. Please suggest if there are better ways to test
>>> this.
>>>>>> 
>>>>>> Please suggest improvements or any other ideas on all of the above.
>>>>>> 
>>>>>> Thanks!
>>>>>> Isha
>>>>>> 
>>>>>> P.S. Sorry for long email. Please let me know if I should start
>>> separate
>>>>>> threads for any of the above points.
>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
>> 

Reply via email to