Repository: helix
Updated Branches:
  refs/heads/master e44b29e03 -> 4c3ad2aec


[HELIX-718] implement ThreadCountBasedTaskAssigner


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4c3ad2ae
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4c3ad2ae
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4c3ad2ae

Branch: refs/heads/master
Commit: 4c3ad2aecc07de97d5f1976a61858ddbe2f836ed
Parents: e44b29e
Author: Harry Zhang <[email protected]>
Authored: Mon Jul 9 16:04:19 2018 -0700
Committer: Harry Zhang <[email protected]>
Committed: Mon Jul 9 16:36:55 2018 -0700

----------------------------------------------------------------------
 .../helix/task/assigner/TaskAssignResult.java   |   2 +-
 .../assigner/ThreadCountBasedTaskAssigner.java  | 174 ++++++++++++++++
 .../helix/task/assigner/AssignerTestBase.java   |  69 +++++++
 .../task/assigner/TestAssignableInstance.java   |  38 +---
 .../TestThreadCountBasedTaskAssigner.java       | 206 +++++++++++++++++++
 5 files changed, 451 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/4c3ad2ae/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java 
b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
index 00d7db1..f81749c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
@@ -77,7 +77,7 @@ public class TaskAssignResult implements 
Comparable<TaskAssignResult> {
    * @return instance name. Null if assignment was not successful
    */
   public String getInstanceName() {
-    return _node.getInstanceName();
+    return _node == null ? null : _node.getInstanceName();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/4c3ad2ae/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
 
b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
new file mode 100644
index 0000000..ece7290
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
@@ -0,0 +1,174 @@
+package org.apache.helix.task.assigner;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Random;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThreadCountBasedTaskAssigner implements TaskAssigner {
+  private static final Logger logger =
+      LoggerFactory.getLogger(ThreadCountBasedTaskAssigner.class);
+
+  private static final int SCHED_QUEUE_INIT_CAPACITY = 200;
+
+  /**
+   * This is a simple task assigning algorithm that uses the following 
assumptions to achieve
+   * efficiency in assigning tasks:
+   *    1. All tasks have same quota type
+   *    2. All tasks only need 1 thread for assignment, no other things to 
consider
+   *
+   * The algorithm ensures the spread-out of tasks with same quota type or 
tasks from same job, with
+   * best effort.
+   * NOTE: once we have more things to consider during scheduling, we will 
need to come up with
+   * a more generic task assignment algorithm
+   * @param assignableInstances String -> AssignableInstanceMapping
+   * @param tasks String -> TaskConfig
+   * @return taskID -> TaskAssignmentResult mapping per task
+   */
+  public Map<String, TaskAssignResult> 
assignTasks(Iterable<AssignableInstance> assignableInstances,
+      Iterable<TaskConfig> tasks) {
+    if (tasks == null || !tasks.iterator().hasNext()) {
+      logger.warn("No task to assign!");
+      return Collections.emptyMap();
+    }
+    if (assignableInstances == null || 
!assignableInstances.iterator().hasNext()) {
+      logger.warn("No instance to assign!");
+      return buildNoInstanceAssignment(tasks);
+    }
+
+    // get quota type
+    String quotaType = tasks.iterator().next().getQuotaType();
+    logger.info("Assigning tasks with quota type {}", quotaType);
+
+    // Build a sched queue
+    PriorityQueue<AssignableInstance> queue = buildSchedQueue(quotaType, 
assignableInstances);
+
+    // Assign
+    Map<String, TaskAssignResult> assignResults = new HashMap<>();
+    TaskAssignResult lastFailure = null;
+    for (TaskConfig task : tasks) {
+
+      // Dedup
+      if (assignResults.containsKey(task.getId())) {
+        logger.warn("Duplicated task assignment {}", task);
+        continue;
+      }
+
+      // Every time we try to assign the task to the least-used instance, if 
that fails,
+      // we assume all subsequent tasks will fail with same reason
+      if (lastFailure != null) {
+        assignResults.put(task.getId(),
+            new TaskAssignResult(task, null, false, 
lastFailure.getFitnessScore(),
+                lastFailure.getFailureReason(), 
lastFailure.getFailureDescription()));
+        continue;
+      }
+
+      // Try to assign the task to least used instance
+      AssignableInstance instance = queue.poll();
+      TaskAssignResult result = instance.tryAssign(task);
+      assignResults.put(task.getId(), result);
+
+      if (!result.isSuccessful()){
+        // For all failure reasons other than duplicated assignment, we can 
fail
+        // subsequent tasks
+        lastFailure = result;
+      } else {
+        // If the task is successfully accepted by the instance, assign it to 
the instance
+        instance.assign(result);
+
+        // requeue the instance to rank again
+        queue.offer(instance);
+      }
+    }
+    logger.info("Finished assigning tasks with quota type {}", quotaType);
+    return assignResults;
+  }
+
+  private PriorityQueue<AssignableInstance> buildSchedQueue(String quotaType,
+      Iterable<AssignableInstance> instances) {
+    AssignableInstanceComparator comparator = new 
AssignableInstanceComparator(quotaType);
+    PriorityQueue<AssignableInstance> queue =
+        new PriorityQueue<>(SCHED_QUEUE_INIT_CAPACITY, comparator);
+    for (AssignableInstance assignableInstance : instances) {
+      queue.offer(assignableInstance);
+    }
+    return queue;
+  }
+
+  private Map<String, TaskAssignResult> 
buildNoInstanceAssignment(Iterable<TaskConfig> tasks) {
+    Map<String, TaskAssignResult> result = new HashMap<>();
+    for (TaskConfig taskConfig : tasks) {
+      result.put(taskConfig.getId(), new TaskAssignResult(taskConfig, null, 
false, 0,
+          TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA, "No assignable 
instance to assign"));
+    }
+    return result;
+  }
+
+  private class AssignableInstanceComparator implements 
Comparator<AssignableInstance> {
+
+    /**
+     * Resource type this comparator needs to compare
+     */
+    private final String RESOURCE_TYPE = 
LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
+
+    /**
+     * Resource quota type this comparator needs to compare
+     */
+    private final String _quotaType;
+
+    public AssignableInstanceComparator(String quotaType) {
+      _quotaType = quotaType;
+    }
+
+    /**
+     * Using this comparator, AssignableInstance will be sorted based on 
availability of
+     * quota given job type in the priority queue. Top of the queue will be 
the one with
+     * highest priority
+     *
+     * @return a negative integer, zero, or a positive integer as the
+     *         first argument is less than, equal to, or greater than the
+     *         second
+     */
+    @Override
+    public int compare(AssignableInstance o1, AssignableInstance o2) {
+      Integer o1RemainingCapacity = getRemainingUsage(o1.getTotalCapacity(), 
o1.getUsedCapacity());
+      Integer o2RemainingCapacity = getRemainingUsage(o2.getTotalCapacity(), 
o2.getUsedCapacity());
+      return o2RemainingCapacity - o1RemainingCapacity;
+    }
+
+    private Integer getRemainingUsage(Map<String, Map<String, Integer>> 
capacity,
+        Map<String, Map<String, Integer>> used) {
+      if (capacity.containsKey(RESOURCE_TYPE) && capacity.get(RESOURCE_TYPE)
+          .containsKey(_quotaType)) {
+        return capacity.get(RESOURCE_TYPE).get(_quotaType) - 
used.get(RESOURCE_TYPE)
+            .get(_quotaType);
+      }
+      return 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/4c3ad2ae/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java 
b/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java
new file mode 100644
index 0000000..4030df7
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java
@@ -0,0 +1,69 @@
+package org.apache.helix.task.assigner;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConfig;
+
+/* package */ class AssignerTestBase {
+
+  private static final String testClusterName = "testCluster";
+  static final String testInstanceName = "testInstance";
+
+  static final String[] testResourceTypes = new String[] {"Resource1", 
"Resource2", "Resource3"};
+  static final String[] testResourceCapacity = new String[] {"20", "50", 
"100"};
+
+  static final String[] testQuotaTypes = new String[] {"Type1", "Type2", 
"Type3"};
+  static final String[] testQuotaRatio = new String[] {"50", "30", "20"};
+  private static final String defaultQuotaRatio = "100";
+
+  /* package */ LiveInstance createLiveInstance(String[] resourceTypes, 
String[] resourceCapacity) {
+    return createLiveInstance(resourceTypes, resourceCapacity, 
testInstanceName);
+  }
+
+  /* package */ LiveInstance createLiveInstance(String[] resourceTypes, 
String[] resourceCapacity, String instancename) {
+    LiveInstance li = new LiveInstance(instancename);
+    if (resourceCapacity != null && resourceTypes != null) {
+      Map<String, String> resMap = new HashMap<>();
+      for (int i = 0; i < resourceCapacity.length; i++) {
+        resMap.put(resourceTypes[i], resourceCapacity[i]);
+      }
+      li.setResourceCapacityMap(resMap);
+    }
+    return li;
+  }
+
+  /* package */ ClusterConfig createClusterConfig(String[] quotaTypes, 
String[] quotaRatio,
+      boolean addDefaultQuota) {
+    ClusterConfig clusterConfig = new ClusterConfig(testClusterName);
+    if (quotaTypes != null && quotaRatio != null) {
+      for (int i = 0; i < quotaTypes.length; i++) {
+        clusterConfig.setTaskQuotaRatio(quotaTypes[i], quotaRatio[i]);
+      }
+    }
+    if (addDefaultQuota) {
+      clusterConfig.setTaskQuotaRatio(TaskConfig.DEFAULT_QUOTA_TYPE, 
defaultQuotaRatio);
+    }
+    return clusterConfig;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/4c3ad2ae/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 9b5974a..f1c92e3 100644
--- 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -29,17 +29,7 @@ import org.apache.helix.task.TaskStateModelFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestAssignableInstance {
-  private static final String testClusterName = "testCluster";
-  private static final String testInstanceName = "testInstance";
-
-  private static final String[] testResourceTypes = new String[] {"Resource1", 
"Resource2", "Resource3"};
-  private static final String[] testResourceCapacity = new String[] {"20", 
"50", "100"};
-
-  private static final String[] testQuotaTypes = new String[] {"Type1", 
"Type2", "Type3"};
-  private static final String[] testQuotaRatio = new String[] {"50", "30", 
"20"};
-  private static final String defaultQuotaRatio = "100";
-
+public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testInvalidInitialization() {
@@ -330,30 +320,4 @@ public class TestAssignableInstance {
     }
     return expectedQuotaPerType;
   }
-
-  private LiveInstance createLiveInstance(String[] resourceTypes, String[] 
resourceCapacity) {
-    LiveInstance li = new LiveInstance(testInstanceName);
-    if (resourceCapacity != null && resourceTypes != null) {
-      Map<String, String> resMap = new HashMap<>();
-      for (int i = 0; i < resourceCapacity.length; i++) {
-        resMap.put(resourceTypes[i], resourceCapacity[i]);
-      }
-      li.setResourceCapacityMap(resMap);
-    }
-    return li;
-  }
-
-  private ClusterConfig createClusterConfig(String[] quotaTypes, String[] 
quotaRatio,
-      boolean addDefaultQuota) {
-    ClusterConfig clusterConfig = new ClusterConfig(testClusterName);
-    if (quotaTypes != null && quotaRatio != null) {
-      for (int i = 0; i < quotaTypes.length; i++) {
-        clusterConfig.setTaskQuotaRatio(quotaTypes[i], quotaRatio[i]);
-      }
-    }
-    if (addDefaultQuota) {
-      clusterConfig.setTaskQuotaRatio(TaskConfig.DEFAULT_QUOTA_TYPE, 
defaultQuotaRatio);
-    }
-    return clusterConfig;
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4c3ad2ae/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
new file mode 100644
index 0000000..ec8753c
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
@@ -0,0 +1,206 @@
+package org.apache.helix.task.assigner;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestThreadCountBasedTaskAssigner extends AssignerTestBase {
+
+  @Test
+  public void testSuccessfulAssignment() {
+    TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
+    int taskCountPerType = 150;
+    int instanceCount = 20;
+    int threadCount = 50;
+    List<AssignableInstance> instances = 
createAssignableInstances(instanceCount, threadCount);
+
+    for (String quotaType : testQuotaTypes) {
+      // Create tasks
+      List<TaskConfig> tasks = createTaskConfigs(taskCountPerType, quotaType);
+
+      // Assign
+      Map<String, TaskAssignResult> results = assigner.assignTasks(instances, 
tasks);
+
+      // Check success
+      assertAssignmentResults(results.values(), true);
+
+      // Check evenness
+      for (AssignableInstance instance : instances) {
+        int assignedCount = instance.getUsedCapacity()
+            
.get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).get(quotaType);
+        Assert.assertTrue(assignedCount <= taskCountPerType / instanceCount + 1
+            && assignedCount >= taskCountPerType / instanceCount);
+      }
+    }
+  }
+
+  @Test
+  public void testAssignmentFailureNoInstance() {
+    TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
+    int taskCount = 10;
+    List<TaskConfig> tasks = createTaskConfigs(taskCount, "Dummy");
+    Map<String, TaskAssignResult> results =
+        assigner.assignTasks(Collections.<AssignableInstance>emptyList(), 
tasks);
+    Assert.assertEquals(results.size(), taskCount);
+    for (TaskAssignResult result : results.values()) {
+      Assert.assertFalse(result.isSuccessful());
+      Assert.assertNull(result.getAssignableInstance());
+      Assert.assertEquals(result.getFailureReason(),
+          TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
+    }
+  }
+
+  @Test
+  public void testAssignmentFailureNoTask() {
+    TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
+    List<AssignableInstance> instances = createAssignableInstances(1, 10);
+    Map<String, TaskAssignResult> results =
+        assigner.assignTasks(instances, Collections.<TaskConfig>emptyList());
+    Assert.assertTrue(results.isEmpty());
+  }
+
+  @Test
+  public void testAssignmentFailureInsufficientQuota() {
+    TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
+
+    // 10 * Type1 quota
+    List<AssignableInstance> instances = createAssignableInstances(2, 10);
+    List<TaskConfig> tasks = createTaskConfigs(20, testQuotaTypes[0]);
+
+    Map<String, TaskAssignResult> results = assigner.assignTasks(instances, 
tasks);
+    int successCnt = 0;
+    int failCnt = 0;
+    for (TaskAssignResult rst : results.values()) {
+      if (rst.isSuccessful()) {
+        successCnt += 1;
+      } else {
+        failCnt += 1;
+        Assert.assertEquals(rst.getFailureReason(),
+            TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
+      }
+    }
+    Assert.assertEquals(successCnt, 10);
+    Assert.assertEquals(failCnt, 10);
+  }
+
+  @Test
+  public void testAssignmentFailureDuplicatedTask() {
+    TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
+    List<AssignableInstance> instances = createAssignableInstances(1, 20);
+    List<TaskConfig> tasks = createTaskConfigs(10, testQuotaTypes[0], false);
+
+    // Duplicate all tasks
+    tasks.addAll(createTaskConfigs(10, testQuotaTypes[0], false));
+    Collections.shuffle(tasks);
+
+    Map<String, TaskAssignResult> results = assigner.assignTasks(instances, 
tasks);
+    Assert.assertEquals(results.size(), 10);
+    assertAssignmentResults(results.values(), true);
+  }
+
+  @Test(enabled = false, description = "Not enabling profiling tests")
+  public void testAssignerProfiling() {
+    int instanceCount = 1000;
+    int taskCount = 50000;
+    for (int batchSize : new int[] {10000, 5000, 2000, 1000, 500, 100}) {
+      System.out.println("testing batch size: " + batchSize);
+      profileAssigner(batchSize, instanceCount, taskCount);
+    }
+  }
+
+  private void profileAssigner(int assignBatchSize, int instanceCount, int 
taskCount) {
+    int trail = 100;
+    long totalTime = 0;
+    for (int i = 0; i < trail; i++) {
+      TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
+
+      // 50 * instanceCount number of tasks
+      List<AssignableInstance> instances = 
createAssignableInstances(instanceCount, 100);
+      List<TaskConfig> tasks = createTaskConfigs(taskCount, testQuotaTypes[0]);
+      List<Map<String, TaskAssignResult>> allResults = new ArrayList<>();
+
+      // Assign
+      long start = System.currentTimeMillis();
+      for (int j = 0; j < taskCount / assignBatchSize; j++) {
+        allResults.add(assigner
+            .assignTasks(instances, tasks.subList(j * assignBatchSize, (j + 1) 
* assignBatchSize)));
+      }
+      long duration = System.currentTimeMillis() - start;
+      totalTime += duration;
+
+      // Validate
+      for (Map<String, TaskAssignResult> results : allResults) {
+        for (TaskAssignResult rst : results.values()) {
+          Assert.assertTrue(rst.isSuccessful());
+        }
+      }
+    }
+    System.out.println("Average time: " + totalTime / trail + "ms");
+  }
+
+  private void assertAssignmentResults(Iterable<TaskAssignResult> results, 
boolean expected) {
+    for (TaskAssignResult rst : results) {
+      Assert.assertEquals(rst.isSuccessful(), expected);
+    }
+  }
+
+  private List<TaskConfig> createTaskConfigs(int count, String quotaType) {
+    return createTaskConfigs(count, quotaType, true);
+  }
+
+  private List<TaskConfig> createTaskConfigs(int count, String quotaType, 
boolean randomID) {
+    List<TaskConfig> tasks = new ArrayList<>();
+    for (int i = 0; i < count; i++) {
+      TaskConfig task =
+          new TaskConfig(null, null, randomID ? UUID.randomUUID().toString() : 
"task-" + i, null);
+      task.setQuotaType(quotaType);
+      tasks.add(task);
+    }
+    return tasks;
+  }
+
+  private List<AssignableInstance> createAssignableInstances(int count, int 
threadCount) {
+    List<AssignableInstance> instances = new ArrayList<>();
+    String instanceNameFormat = "instance-%s";
+    for (int i = 0; i < count; i++) {
+      String instanceName = String.format(instanceNameFormat, i);
+      instances.add(
+          new AssignableInstance(
+              createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+              new InstanceConfig(instanceName),
+              createLiveInstance(
+                  new String[] { 
LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
+                  new String[] { Integer.toString(threadCount) },
+                  instanceName)
+          )
+      );
+    }
+    return instances;
+  }
+}

Reply via email to