Hi, Thanks for inputs! I like the idea of having single set of (anti?)affinity rules at dag context level. There could still be conflicts based on Locality for Streams or Node locality attribute set for operators. But as Sandeep suggested, I also think Dag.validate should fail in case of contradicting constraints.
We currently do not have 'affinity' among non stream operators, as Bhupesh pointed out. It is somewhat achievable by requesting node locality for operators (Assuming node requests work as expected). But should we consider adding affinity specifications support as well along with anti-affinity? Regarding specification of attributes from dt-site.xml. We can go with Json like string or even xml representation for complex objects. What is our current behavior for setting Java object properties through XML? We can follow the same for this as well. As for precedence or ability to satisfy constraints: Right now in normal scenario, if resources are not available for allocating containers, we keep sending to request till all are obtained. Likewise, in case of strict anti-affinity policy, we should keep the application in ACCEPTED state till the anti-affinity constraint is satisfied. For relaxed policy, we can decide timeout for relaxing the anti-affinity rule. Please note this applies only when we have non-contradicting rules. Thanks! Isha On Fri, Jan 22, 2016 at 5:30 AM, Bhupesh Chawda <[email protected]> wrote: > I agree on having a single set of rules for the affinity as well as > anti-affinity of operators / partitions on containers. > > However, I noted the following points: > > 1. AFAIK, we do not support affinity (locality) in a general sense. The > affinity is only for a stream, not for *any* two operators. So, we > should also look at the general case and see how it can be supported, if > there are valid use cases. > 2. Coming to anti-affinity, we cannot type cast it as a type of affinity > rule. Saying "two operators must be on the same container" is very > different from saying "these two operators must not be on the same > container". In this case, the second one is a much more relaxed rule as > compared to the first one. > 3. Once we have this generic set of rules, there must be a > "satisfiability" test run before requesting YARN for containers. If the > request is not satisfiable, then there is no point asking YARN to > allocate > containers in this manner. In case it is not satisfiable, we must also > have > a default order in which the rules can be "relaxed" and the request be > made > satisfiable. For example, some very strict rules may be ignored, or made > less constraining ( for example "on the same container" => "on the same > node"). > > Thanks. > > -Bhupesh > > On Fri, Jan 22, 2016 at 2:54 PM, Aniruddha Thombare < > [email protected]> wrote: > > > +1 On Chinmay's suggestion about having single set of affinity rules. > > > > > > Thanks, > > > > > > Aniruddha > > > > On Fri, Jan 22, 2016 at 1:57 PM, Sandeep Deshmukh < > [email protected] > > > > > wrote: > > > > > > Between 2 operators, if one configures thread/container local and > > > > anti-affinity as well, which one will take affect? > > > > > > The DAG validation step should error out in this case. > > > > > > +1 on suggestion by Chinmay to name it "Affinity Rules" than > > > anti-affinity. We are just extending our container allocation scheme to > > > support containers not to be allocated together. > > > > > > Regards, > > > Sandeep > > > > > > On Fri, Jan 22, 2016 at 1:43 PM, Chinmay Kolhatkar < > > > [email protected]> > > > wrote: > > > > > > > Hi Isha, > > > > > > > > Couple of points: > > > > 1. About the interface to configuring anti-affinity, as per > suggestion > > > > above there are 2 different way to configure locality and > > anti-affinity: > > > > i.e. dag.setAttribute - for anti-affinity & > > > > dag.addStream(...).setLocality for locality. > > > > > > > > Between 2 operators, if one configures thread/container local and > > > > anti-affinity as well, which one will take affect? > > > > > > > > 2. Consider there could be such confusion as above, would it make > sense > > > to > > > > have a single API which takes care of both anti-affinity and > locality. > > > This > > > > way, one is configurable. > > > > > > > > 3. This point is coming from how VM affinity is configured in > vSphere. > > > > The VMs are configured affinity are called as "affinity rules" and > not > > > > "anti-affinity rules". Ultimately idea is to allocate processing to > > > nodes. > > > > Via "VM-VM affinity rules", anti-affinity is also configured. But > there > > > is > > > > a single set of rule definition for both affinity (similar to > locality > > in > > > > our case) and anti-affinity. > > > > Would it be a better approach for configuring locality rules and > > > > anti-affinity rules in a single rule and call it "affinity rule". > > > > > > > > Thanks, > > > > Chinmay. > > > > > > > > > > > > On Fri, Jan 22, 2016 at 12:24 PM, Yogi Devendra < > > [email protected] > > > > > > > > wrote: > > > > > > > > > @Isha > > > > > > > > > > I understand that anti-affinity across application is not > > > > straight-forward. > > > > > It would be OK even if we do not have it in iteration 1. > > > > > > > > > > But, for attributes syntax; I still think that Java object should > be > > > > > avoided as they will be hard to configure from dt-site.xml or other > > > > config > > > > > files. > > > > > Other suggestion for this could be JSON representation of String > > array: > > > > > ["O2", "O3"]. (If operator names has some special characters like > " > > > or [ > > > > > or , those will be escaped in the JSON representation.) > > > > > Not sure if others agree on this; but attribute syntax should be > > > > finalized > > > > > in iteration 1 to avoid backward compatibility issues later. > > > > > > > > > > ~ Yogi > > > > > > > > > > On 22 January 2016 at 00:43, Thomas Weise <[email protected]> > > > > wrote: > > > > > > > > > > > Node based requests is the best approach - if it works :-) > > > > > > > > > > > > Blacklisting will require to allocate the containers > sequentially. > > It > > > > > will > > > > > > work, but slow down application startup, especially for larger > > > > > topologies. > > > > > > > > > > > > On Thu, Jan 21, 2016 at 10:42 AM, Isha Arkatkar < > > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > Should we consider the node based requests if it works with > > > > Capacity > > > > > > > Scheduler or avoid 2b approach altogether? I checked that node > > > > requests > > > > > > do > > > > > > > not work with fair scheduler on CDH cluster. Yarn does not > return > > > any > > > > > > > container if hostname is given in the container request. I am > > > trying > > > > to > > > > > > > setup a small virtual hortonworks cluster to check the this > > > behavior > > > > on > > > > > > > that. > > > > > > > YARN-2027 <https://issues.apache.org/jira/browse/YARN-2027> > > > > mentioned > > > > > > that > > > > > > > container requests are not honored in capacity scheduler too. > > But I > > > > am > > > > > > not > > > > > > > sure if it is because of distro dependent issue. Please share > > > > insights. > > > > > > > > > > > > > > @Vlad, Adding support for regular expression sounds good. We > > could > > > > > > > translate to list of operator names internally based on regex. > > > > > > > > > > > > > > @Yogi, I went with a list of strings for attribute because > "O2, > > > O3" > > > > > > could > > > > > > > be a valid single operator name too :) > > > > > > > I am not sure of ways to implement anti-affinity across > > > application. > > > > > > Though > > > > > > > something to consider for later iteration. > > > > > > > > > > > > > > Thanks, > > > > > > > Isha > > > > > > > > > > > > > > On Wed, Jan 20, 2016 at 8:59 PM, Thomas Weise < > > > > [email protected]> > > > > > > > wrote: > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/SLIDER-82 > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 20, 2016 at 8:56 PM, Thomas Weise < > > > > > [email protected]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > The point was that containers are taken away from other > apps > > > that > > > > > may > > > > > > > > have > > > > > > > > > to discard work etc. It's not good style to claim resources > > and > > > > not > > > > > > use > > > > > > > > > them eventually :-) > > > > > > > > > > > > > > > > > > For this feature it is necessary to look at the scheduler > > > > > > > > > capabilities/semantics and limitations. For example, don't > > bet > > > > > > > > exclusively > > > > > > > > > on node requests if the goal is for it to work with > > > > FairScheduler. > > > > > > > > > > > > > > > > > > Also look at Slider, which just recently added support for > > > > > > > anti-affinity > > > > > > > > > (using node requests). When you run it on the CDH cluster, > it > > > > > > probably > > > > > > > > > won't work... > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 20, 2016 at 3:19 PM, Pramod Immaneni < > > > > > > > [email protected] > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > >> Once released won't the containers be available again in > the > > > > pool. > > > > > > > This > > > > > > > > >> would only be optional and not mandatory. > > > > > > > > >> > > > > > > > > >> Thanks > > > > > > > > >> > > > > > > > > >> On Tue, Jan 19, 2016 at 2:02 PM, Thomas Weise < > > > > > > [email protected] > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > >> > > > > > > > > >> > How about also supporting a minor variation of it as an > > > option > > > > > > > > >> > > where it greedily gets the total number of containers > > and > > > > > > discards > > > > > > > > >> ones > > > > > > > > >> > it > > > > > > > > >> > > can't use and repeats the process for the remaining > till > > > > > > > everything > > > > > > > > >> has > > > > > > > > >> > > been allocated. > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > This is problematic as with resource preemption these > > > > containers > > > > > > > will > > > > > > > > be > > > > > > > > >> > potentially taken away from other applications and then > > > thrown > > > > > > away. > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > Also does it make sense to support anti-cluster > > affinity? > > > > > > > > >> > > > > > > > > > > >> > > Thanks > > > > > > > > >> > > > > > > > > > > >> > > On Tue, Jan 19, 2016 at 1:21 PM, Isha Arkatkar < > > > > > > > > [email protected]> > > > > > > > > >> > > wrote: > > > > > > > > >> > > > > > > > > > > >> > > > Hi all, > > > > > > > > >> > > > > > > > > > > > >> > > > We want add support for Anti-affinity in Apex to > > > allow > > > > > > > > >> applications > > > > > > > > >> > to > > > > > > > > >> > > > launch specific physical operators on different > > > > > > > nodes(APEXCORE-10 > > > > > > > > >> > > > <https://issues.apache.org/jira/browse/APEXCORE-10 > >). > > > > Want > > > > > to > > > > > > > > >> request > > > > > > > > >> > > your > > > > > > > > >> > > > suggestions/ideas for the same! > > > > > > > > >> > > > > > > > > > > > >> > > > The reasons for using anti-affinity in operators > > could > > > > be: > > > > > > to > > > > > > > > >> ensure > > > > > > > > >> > > > reliability, for performance reasons (such as > > > application > > > > > may > > > > > > > not > > > > > > > > >> want > > > > > > > > >> > 2 > > > > > > > > >> > > > i/o intensive operators to land on the same node to > > > > improve > > > > > > > > >> > performance) > > > > > > > > >> > > or > > > > > > > > >> > > > for some application specific constraints(for > example, > > > 2 > > > > > > > > partitions > > > > > > > > >> > > cannot > > > > > > > > >> > > > be run on the same node since they use same port > > > number). > > > > > This > > > > > > > is > > > > > > > > >> the > > > > > > > > >> > > > general rationale for adding Anti-affinity support. > > > > > > > > >> > > > > > > > > > > > >> > > > Since, Yarn does not support anti-affinity yet > > > (YARN-1042 > > > > > > > > >> > > > <https://issues.apache.org/jira/browse/YARN-1042>), > > we > > > > need > > > > > > to > > > > > > > > >> > implement > > > > > > > > >> > > > the logic in AM. Wanted to get your views on > following > > > > > aspects > > > > > > > for > > > > > > > > >> this > > > > > > > > >> > > > implementation: > > > > > > > > >> > > > > > > > > > > > >> > > > *1. How to specify anti-affinity for physical > > > > > > > operators/partitions > > > > > > > > >> in > > > > > > > > >> > > > application:* > > > > > > > > >> > > > One way for this is to have an attribute for > > setting > > > > > > > > >> anti-affinity > > > > > > > > >> > at > > > > > > > > >> > > > the logical operator context. And an operator can > set > > > this > > > > > > > > attribute > > > > > > > > >> > with > > > > > > > > >> > > > list of operator names which should not be > collocated. > > > > > > > > >> > > > Consider dag with 3 operators: > > > > > > > > >> > > > TestOperator o1 = dag.addOperator("O1", new > > > > > > > TestOperator()); > > > > > > > > >> > > > TestOperator o2 = dag.addOperator("O2", new > > > > > > > TestOperator()); > > > > > > > > >> > > > TestOperator o3 = dag.addOperator("O3", new > > > > > > > TestOperator()); > > > > > > > > >> > > > > > > > > > > > >> > > > To set anti-affinity for O1 operator: > > > > > > > > >> > > > dag.setAttribute(o1, > > OperatorContext.ANTI_AFFINITY, > > > > new > > > > > > > > >> > > > ArrayList<String>(Arrays.asList("O2", "O3"))); > > > > > > > > >> > > > This would mean O1 should not be allocated on > > nodes > > > > > > > > containing > > > > > > > > >> > > > operators O2 and O3. This applies to all allocated > > > > > partitions > > > > > > of > > > > > > > > O1, > > > > > > > > >> > O2, > > > > > > > > >> > > > O3. > > > > > > > > >> > > > > > > > > > > > >> > > > Also, if same operator name is part of > > anti-affinity > > > > > list, > > > > > > it > > > > > > > > >> means > > > > > > > > >> > > > partitions of the operator should not be allocated > on > > > the > > > > > same > > > > > > > > node. > > > > > > > > >> > > > example: > > > > > > > > >> > > > dag.setAttribute(o2, > > OperatorContext.ANTI_AFFINITY, > > > > new > > > > > > > > >> > > > ArrayList<String>(Arrays.asList("O2"))); > > > > > > > > >> > > > This indicates anti-affinity between all > > partitions > > > of > > > > > O2. > > > > > > > > i.e. > > > > > > > > >> all > > > > > > > > >> > > > partitions of O2 should be launched on different > > nodes. > > > > > > > > >> > > > > > > > > > > > >> > > > Based on the anti-affinity attribute specified > for > > > > > logical > > > > > > > > >> operator, > > > > > > > > >> > > > during physical plan creation, we can add this list > to > > > > each > > > > > > > > >> > PTContainer. > > > > > > > > >> > > > This in turn will be available for Stram for sending > > > > > container > > > > > > > > >> requests > > > > > > > > >> > > > accordingly. > > > > > > > > >> > > > > > > > > > > > >> > > > Please suggest if there is a better way to > express > > > this > > > > > > > intent. > > > > > > > > >> > > > > > > > > > > > >> > > > *2. How to implement anti-affinity in AM* > > > > > > > > >> > > > There are 2 ways we can implement this: > > > > > > > > >> > > > * a. Blacklisting of nodes: *We can group the > > physical > > > > > > > container > > > > > > > > >> > > requests > > > > > > > > >> > > > based on anti-affinity requirements and send > > allocation > > > > > > requests > > > > > > > > for > > > > > > > > >> > > > containers in groups. After first group is done, > > > blacklist > > > > > the > > > > > > > > nodes > > > > > > > > >> > > before > > > > > > > > >> > > > sending second group of container requests. This > will > > > > ensure > > > > > > > that > > > > > > > > >> the > > > > > > > > >> > > > containers with anti-affinity requirements will be > > > > > allocated > > > > > > on > > > > > > > > >> > > different > > > > > > > > >> > > > nodes. > > > > > > > > >> > > > * b. Node specific container request: *Explore and > > > > create > > > > > a > > > > > > > map > > > > > > > > of > > > > > > > > >> > > nodes > > > > > > > > >> > > > present in the cluster and send allocation request > for > > > > > > container > > > > > > > > on > > > > > > > > >> a > > > > > > > > >> > > > specific node, honoring anti-affinity. There are > > couple > > > of > > > > > > open > > > > > > > > Yarn > > > > > > > > >> > > Jiras > > > > > > > > >> > > > for node specific container requests: YARN-1412 > > > > > > > > >> > > > <https://issues.apache.org/jira/browse/YARN-1412>, > > > > > YARN-2027 > > > > > > > > >> > > > <https://issues.apache.org/jira/browse/YARN-2027>. > > So, > > > > need > > > > > > to > > > > > > > > >> check > > > > > > > > >> > if > > > > > > > > >> > > > this is a plausible approach. > > > > > > > > >> > > > > > > > > > > > >> > > > *3. Strict Vs Relaxed anti-affinity* > > > > > > > > >> > > > Depending on cluster resources availability, it > may > > > not > > > > be > > > > > > > > >> possible > > > > > > > > >> > to > > > > > > > > >> > > > honor all anti-affinity requirements specified. > > > > > > > > >> > > > *Strict Anti-affinity:* AM will keep trying to > > allocate > > > > > > > containers > > > > > > > > >> as > > > > > > > > >> > per > > > > > > > > >> > > > anti-affinity requirements indefinitely. This > behavior > > > > will > > > > > be > > > > > > > > >> similar > > > > > > > > >> > to > > > > > > > > >> > > > how an application shows in ACCEPTED state, till > > > resources > > > > > are > > > > > > > > >> > available > > > > > > > > >> > > to > > > > > > > > >> > > > launch in cluster. > > > > > > > > >> > > > *Relaxed Anti-affinity:* AM will drop the > > anti-affinity > > > > > > > constraint > > > > > > > > >> > after > > > > > > > > >> > > a > > > > > > > > >> > > > certain timeout. > > > > > > > > >> > > > > > > > > > > > >> > > > We need a way to set this attribute through > > application. > > > > > > (Either > > > > > > > > in > > > > > > > > >> > > > operator context or in DAGContext for application > wide > > > > > > setting.) > > > > > > > > >> > > > > > > > > > > > >> > > > *4. How do we unit test this feature* > > > > > > > > >> > > > We could use Mockito for mocking Yarn behaviors > and > > > test > > > > > > only > > > > > > > AM > > > > > > > > >> > > > implementation, since it may not be easy to simulate > > > some > > > > > > > > scenarios > > > > > > > > >> > > > manually in cluster. Please suggest if there are > > better > > > > ways > > > > > > to > > > > > > > > test > > > > > > > > >> > > this. > > > > > > > > >> > > > > > > > > > > > >> > > > Please suggest improvements or any other ideas on > all > > of > > > > the > > > > > > > > above. > > > > > > > > >> > > > > > > > > > > > >> > > > Thanks! > > > > > > > > >> > > > Isha > > > > > > > > >> > > > > > > > > > > > >> > > > P.S. Sorry for long email. Please let me know if I > > > should > > > > > > start > > > > > > > > >> > separate > > > > > > > > >> > > > threads for any of the above points. > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
