[ 
https://issues.apache.org/jira/browse/FLINK-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174361#comment-16174361
 ] 

ASF GitHub Bot commented on FLINK-7486:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4628#discussion_r140032946
  
    --- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
    @@ -675,6 +679,42 @@ private LaunchableMesosWorker 
createLaunchableMesosWorker(Protos.TaskID taskID)
        }
     
        /**
    +    * Sets a coTaskGetter callback for evaluating balancing constraint.
    +    */
    +   private void setCoTaskGetter() {
    +           for 
(MesosTaskManagerParameters.BalancedHostAttrConstraintParams param : 
taskManagerParameters.balancedConstraintParams()) {
    +                   param.setCoTasksGetter(new Func1<String, Set<String>>() 
{
    +                           @Override
    +                           public Set<String> call(String s) {
    +                                   Map<String, Set<String>> 
taskToCoTasksMap = new HashMap<>();
    +                                   Set <String> taskIds = getTaskIdsSet();
    +                                   for (String taskId : taskIds) {
    +                                           Set <String> coTaskIds = new 
HashSet<>(taskIds);
    +                                           coTaskIds.remove(taskId);
    +                                           taskToCoTasksMap.put(taskId, 
coTaskIds);
    +                                   }
    +                                   return taskToCoTasksMap.get(s);
    +                           }
    +                   });
    +           }
    +   }
    +
    +   /**
    +    * Compiles the set of task IDs in new/launch state.
    +    * @return The unique TaskIDs
    +    */
    +   private Set<String> getTaskIdsSet() {
    +           Set<String> taskIds = new HashSet<String>();
    +           List <MesosWorkerStore.Worker> workers = new 
ArrayList<MesosWorkerStore.Worker>();
    +           workers.addAll(this.workersInNew.values());
    +           workers.addAll(this.workersInLaunch.values());
    --- End diff --
    
    I think we don't have to add the `workersInNew.values()` and 
`workersInLaunch.values()` first to `workers` and then only to `taskIds`. We 
can directly add them to `taskIds`. Saves us one copy operation.


> flink-mesos: Support for adding unique attribute / group_by attribute 
> constraints
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-7486
>                 URL: https://issues.apache.org/jira/browse/FLINK-7486
>             Project: Flink
>          Issue Type: Improvement
>          Components: Mesos
>    Affects Versions: 1.3.2
>            Reporter: Bhumika Bayani
>            Assignee: Bhumika Bayani
>
> In our setup, we have multiple mesos-workers. Inspite of this, flink 
> application master most of the times ends up spawning all task-managers on 
> same mesos-worker.
> We intend to ensure HA of task managers. We would like to make sure each 
> task-manager is running on different mesos-worker as well as such 
> mesos-worker which does not share the AZ attribute with earlier task manager 
> instances.
> Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute 
> contraints. Flink-mesos should also enable us to add these kind of 
> constraints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to