[ https://issues.apache.org/jira/browse/FLINK-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174361#comment-16174361 ]
ASF GitHub Bot commented on FLINK-7486: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4628#discussion_r140032946 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -675,6 +679,42 @@ private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) } /** + * Sets a coTaskGetter callback for evaluating balancing constraint. + */ + private void setCoTaskGetter() { + for (MesosTaskManagerParameters.BalancedHostAttrConstraintParams param : taskManagerParameters.balancedConstraintParams()) { + param.setCoTasksGetter(new Func1<String, Set<String>>() { + @Override + public Set<String> call(String s) { + Map<String, Set<String>> taskToCoTasksMap = new HashMap<>(); + Set <String> taskIds = getTaskIdsSet(); + for (String taskId : taskIds) { + Set <String> coTaskIds = new HashSet<>(taskIds); + coTaskIds.remove(taskId); + taskToCoTasksMap.put(taskId, coTaskIds); + } + return taskToCoTasksMap.get(s); + } + }); + } + } + + /** + * Compiles the set of task IDs in new/launch state. + * @return The unique TaskIDs + */ + private Set<String> getTaskIdsSet() { + Set<String> taskIds = new HashSet<String>(); + List <MesosWorkerStore.Worker> workers = new ArrayList<MesosWorkerStore.Worker>(); + workers.addAll(this.workersInNew.values()); + workers.addAll(this.workersInLaunch.values()); --- End diff -- I think we don't have to add the `workersInNew.values()` and `workersInLaunch.values()` first to `workers` and then only to `taskIds`. We can directly add them to `taskIds`. Saves us one copy operation. > flink-mesos: Support for adding unique attribute / group_by attribute > constraints > --------------------------------------------------------------------------------- > > Key: FLINK-7486 > URL: https://issues.apache.org/jira/browse/FLINK-7486 > Project: Flink > Issue Type: Improvement > Components: Mesos > Affects Versions: 1.3.2 > Reporter: Bhumika Bayani > Assignee: Bhumika Bayani > > In our setup, we have multiple mesos-workers. Inspite of this, flink > application master most of the times ends up spawning all task-managers on > same mesos-worker. > We intend to ensure HA of task managers. We would like to make sure each > task-manager is running on different mesos-worker as well as such > mesos-worker which does not share the AZ attribute with earlier task manager > instances. > Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute > contraints. Flink-mesos should also enable us to add these kind of > constraints. -- This message was sent by Atlassian JIRA (v6.4.14#64029)