Repository: aurora Updated Branches: refs/heads/master 14e7b84f4 -> 4b9c759cf
Refine types used in QuotaManager, share more functions/predicates. Reviewed at https://reviews.apache.org/r/32371/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4b9c759c Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4b9c759c Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4b9c759c Branch: refs/heads/master Commit: 4b9c759cf3868b1b89e5411fd7ed782d2e5f81e0 Parents: 14e7b84 Author: Bill Farner <[email protected]> Authored: Wed Apr 1 10:22:18 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Wed Apr 1 10:22:18 2015 -0700 ---------------------------------------------------------------------- .../aurora/scheduler/quota/QuotaManager.java | 306 ++++++++++--------- .../aurora/scheduler/updater/UpdateFactory.java | 10 +- .../aurora/scheduler/updater/Updates.java | 22 ++ 3 files changed, 183 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/4b9c759c/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java index 39e930c..7453680 100644 --- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java +++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java @@ -13,7 +13,7 @@ */ package org.apache.aurora.scheduler.quota; -import java.util.List; +import java.util.Arrays; import java.util.Map; import java.util.Set; @@ -24,13 +24,11 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableRangeSet; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.collect.Range; import com.google.common.collect.RangeSet; -import com.google.common.collect.Sets; import org.apache.aurora.gen.JobUpdateQuery; import org.apache.aurora.gen.ResourceAggregate; @@ -40,6 +38,7 @@ import org.apache.aurora.scheduler.base.ResourceAggregates; import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.QuotaStore; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; @@ -49,17 +48,19 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; import org.apache.aurora.scheduler.storage.entities.IRange; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.updater.Updates; import static java.util.Objects.requireNonNull; import static org.apache.aurora.scheduler.base.ResourceAggregates.EMPTY; +import static org.apache.aurora.scheduler.base.Tasks.ASSIGNED_TO_INFO; +import static org.apache.aurora.scheduler.base.Tasks.ASSIGNED_TO_JOB_KEY; import static org.apache.aurora.scheduler.base.Tasks.INFO_TO_JOB_KEY; import static org.apache.aurora.scheduler.base.Tasks.IS_PRODUCTION; -import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_INFO; +import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED; import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA; +import static org.apache.aurora.scheduler.updater.Updates.getInstanceIds; /** * Allows access to resource quotas, and tracks quota consumption. @@ -81,7 +82,7 @@ public interface QuotaManager { * * @param role Quota owner. * @param storeProvider A store provider to access quota data. - * @return {@code QuotaInfo} instance. + * @return quota usage information for the given role. */ QuotaInfo getQuotaInfo(String role, StoreProvider storeProvider); @@ -93,7 +94,7 @@ public interface QuotaManager { * @param template Task resource requirement. * @param instances Number of additional instances requested. * @param storeProvider A store provider to access quota data. - * @return {@code QuotaComparisonResult} instance with quota check result details. + * @return quota check result details. */ QuotaCheckResult checkInstanceAddition( ITaskConfig template, @@ -106,7 +107,7 @@ public interface QuotaManager { * * @param jobUpdate Job update to check quota for. * @param storeProvider A store provider to access quota data. - * @return {@code QuotaComparisonResult} instance with quota check result details. + * @return quota check result details. */ QuotaCheckResult checkJobUpdate(IJobUpdate jobUpdate, StoreProvider storeProvider); @@ -116,7 +117,7 @@ public interface QuotaManager { * * @param cronConfig Cron job configuration. * @param storeProvider A store provider to access quota data. - * @return{@code QuotaComparisonResult} instance with quota check result details. + * @return quota check result details. */ QuotaCheckResult checkCronUpdate(IJobConfiguration cronConfig, StoreProvider storeProvider); @@ -233,18 +234,20 @@ public interface QuotaManager { Optional<IJobUpdate> requestedUpdate, StoreProvider storeProvider) { - FluentIterable<IScheduledTask> tasks = FluentIterable.from( - storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active())); + FluentIterable<IAssignedTask> tasks = FluentIterable + .from(storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active())) + .transform(SCHEDULED_TO_ASSIGNED); - Map<IJobKey, IJobUpdate> updates = Maps.newHashMap( - fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role) - .uniqueIndex(UPDATE_TO_JOB_KEY)); + Map<IJobKey, IJobUpdateInstructions> updates = Maps.newHashMap( + fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role)); // Mix in a requested job update (if present) to correctly calculate consumption. // This would be an update that is not saved in the store yet (i.e. the one quota is // checked for). if (requestedUpdate.isPresent()) { - updates.put(requestedUpdate.get().getSummary().getKey().getJob(), requestedUpdate.get()); + updates.put( + requestedUpdate.get().getSummary().getKey().getJob(), + requestedUpdate.get().getInstructions()); } Map<IJobKey, IJobConfiguration> cronTemplates = @@ -262,32 +265,47 @@ public interface QuotaManager { return new QuotaInfo(quota, prodConsumed, nonProdConsumed); } + private static final Function<IJobConfiguration, ITaskConfig> JOB_TO_TASK = + new Function<IJobConfiguration, ITaskConfig>() { + @Override + public ITaskConfig apply(IJobConfiguration job) { + return job.getTaskConfig(); + } + }; + private IResourceAggregate getConsumption( - FluentIterable<IScheduledTask> tasks, - Map<IJobKey, IJobUpdate> updatesByKey, + FluentIterable<IAssignedTask> tasks, + Map<IJobKey, IJobUpdateInstructions> updatesByKey, Map<IJobKey, IJobConfiguration> cronTemplatesByKey, boolean isProd) { Predicate<ITaskConfig> prodFilter = isProd ? IS_PRODUCTION : Predicates.not(IS_PRODUCTION); - FluentIterable<IScheduledTask> filteredTasks = - tasks.filter(Predicates.compose(prodFilter, SCHEDULED_TO_INFO)); + FluentIterable<IAssignedTask> filteredTasks = + tasks.filter(Predicates.compose(prodFilter, ASSIGNED_TO_INFO)); + + Predicate<IAssignedTask> excludeCron = Predicates.compose( + Predicates.not(Predicates.in(cronTemplatesByKey.keySet())), + ASSIGNED_TO_JOB_KEY); IResourceAggregate nonCronConsumption = getNonCronConsumption( updatesByKey, - excludeCronTasks(filteredTasks, cronTemplatesByKey), - isProd); + filteredTasks.filter(excludeCron), + prodFilter); - IResourceAggregate cronConsumption = - getCronConsumption(cronTemplatesByKey, filteredTasks, isProd); + IResourceAggregate cronConsumption = getCronConsumption( + Iterables.filter( + cronTemplatesByKey.values(), + Predicates.compose(prodFilter, JOB_TO_TASK)), + filteredTasks.transform(ASSIGNED_TO_INFO)); return add(nonCronConsumption, cronConsumption); } private static IResourceAggregate getNonCronConsumption( - Map<IJobKey, IJobUpdate> updatesByKey, - FluentIterable<IScheduledTask> tasks, - boolean isProd) { + Map<IJobKey, IJobUpdateInstructions> updatesByKey, + FluentIterable<IAssignedTask> tasks, + final Predicate<ITaskConfig> configFilter) { // 1. Get all active tasks that belong to jobs without active updates OR unaffected by an // active update working set. An example of the latter would be instances not updated by @@ -302,21 +320,20 @@ public interface QuotaManager { IResourceAggregate nonUpdateConsumption = fromTasks(tasks .filter(buildNonUpdatingTasksFilter(updatesByKey)) - .transform(SCHEDULED_TO_INFO)); + .transform(ASSIGNED_TO_INFO)); - IResourceAggregate updateConsumption = EMPTY; - for (IJobUpdate update : updatesByKey.values()) { - updateConsumption = - add(updateConsumption, instructionsToResources(update.getInstructions(), isProd)); - } + final Predicate<IInstanceTaskConfig> instanceFilter = + Predicates.compose(configFilter, INSTANCE_CONFIG); + + IResourceAggregate updateConsumption = + addAll(Iterables.transform(updatesByKey.values(), updateResources(instanceFilter))); return add(nonUpdateConsumption, updateConsumption); } private static IResourceAggregate getCronConsumption( - Map<IJobKey, IJobConfiguration> cronTemplates, - FluentIterable<IScheduledTask> tasks, - boolean isProd) { + Iterable<IJobConfiguration> cronTemplates, + FluentIterable<ITaskConfig> tasks) { // Calculate the overall cron consumption as MAX between cron template resources and active // cron tasks. This is required to account for a case when a running cron task has higher @@ -326,52 +343,36 @@ public interface QuotaManager { // cron scheduling, it's the simplest approach possible given the system constraints (e.g.: // lack of enforcement on a cron job run duration). - Multimap<IJobKey, ITaskConfig> taskConfigsByKey = - tasks.transform(SCHEDULED_TO_INFO).index(INFO_TO_JOB_KEY); - - IResourceAggregate totalConsumption = EMPTY; - for (IJobConfiguration config : cronTemplates.values()) { - if (isProd == config.getTaskConfig().isProduction()) { - IResourceAggregate templateConsumption = - scale(config.getTaskConfig(), config.getInstanceCount()); - - IResourceAggregate taskConsumption = fromTasks(taskConfigsByKey.get(config.getKey())); - - totalConsumption = add(totalConsumption, max(templateConsumption, taskConsumption)); - } - } - return totalConsumption; - } - - private static FluentIterable<IScheduledTask> excludeCronTasks( - FluentIterable<IScheduledTask> tasks, - final Map<IJobKey, IJobConfiguration> cronJobs) { - - return tasks.filter(new Predicate<IScheduledTask>() { - @Override - public boolean apply(IScheduledTask input) { - return !cronJobs.containsKey(input.getAssignedTask().getTask().getJob()); - } - }); + final Multimap<IJobKey, ITaskConfig> taskConfigsByKey = tasks.index(INFO_TO_JOB_KEY); + return addAll(Iterables.transform( + cronTemplates, + new Function<IJobConfiguration, IResourceAggregate>() { + @Override + public IResourceAggregate apply(IJobConfiguration config) { + return max( + scale(config.getTaskConfig(), config.getInstanceCount()), + fromTasks(taskConfigsByKey.get(config.getKey()))); + } + })); } - private static Predicate<IScheduledTask> buildNonUpdatingTasksFilter( - final Map<IJobKey, IJobUpdate> roleJobUpdates) { + private static Predicate<IAssignedTask> buildNonUpdatingTasksFilter( + final Map<IJobKey, IJobUpdateInstructions> roleJobUpdates) { - return new Predicate<IScheduledTask>() { + return new Predicate<IAssignedTask>() { @Override - public boolean apply(IScheduledTask input) { - Optional<IJobUpdate> update = Optional.fromNullable( - roleJobUpdates.get(input.getAssignedTask().getTask().getJob())); + public boolean apply(IAssignedTask task) { + Optional<IJobUpdateInstructions> update = Optional.fromNullable( + roleJobUpdates.get(task.getTask().getJob())); if (update.isPresent()) { - IJobUpdateInstructions instructions = update.get().getInstructions(); - RangeSet<Integer> initialInstances = instanceRangeSet(instructions.getInitialState()); - RangeSet<Integer> desiredInstances = instanceRangeSet(instructions.isSetDesiredState() + IJobUpdateInstructions instructions = update.get(); + RangeSet<Integer> initialInstances = getInstanceIds(instructions.getInitialState()); + RangeSet<Integer> desiredInstances = getInstanceIds(instructions.isSetDesiredState() ? ImmutableSet.of(instructions.getDesiredState()) : ImmutableSet.<IInstanceTaskConfig>of()); - int instanceId = input.getAssignedTask().getInstanceId(); + int instanceId = task.getInstanceId(); return !initialInstances.contains(instanceId) && !desiredInstances.contains(instanceId); } return true; @@ -379,18 +380,31 @@ public interface QuotaManager { }; } - private static FluentIterable<IJobUpdate> fetchActiveJobUpdates( - JobUpdateStore jobUpdateStore, - String role) { + private static final Function<IJobUpdate, IJobUpdateInstructions> UPDATE_TO_INSTRUCTIONS = + new Function<IJobUpdate, IJobUpdateInstructions>() { + @Override + public IJobUpdateInstructions apply(IJobUpdate update) { + return update.getInstructions(); + } + }; - List<IJobUpdateSummary> summaries = jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role)); + private static Map<IJobKey, IJobUpdateInstructions> fetchActiveJobUpdates( + final JobUpdateStore jobUpdateStore, + String role) { - Set<IJobUpdate> updates = Sets.newHashSet(); - for (IJobUpdateSummary summary : summaries) { - updates.add(jobUpdateStore.fetchJobUpdate(summary.getKey()).get()); - } + Function<IJobUpdateSummary, IJobUpdate> fetchUpdate = + new Function<IJobUpdateSummary, IJobUpdate>() { + @Override + public IJobUpdate apply(IJobUpdateSummary summary) { + return jobUpdateStore.fetchJobUpdate(summary.getKey()).get(); + } + }; - return FluentIterable.from(updates); + return Maps.transformValues( + FluentIterable.from(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role))) + .transform(fetchUpdate) + .uniqueIndex(UPDATE_TO_JOB_KEY), + UPDATE_TO_INSTRUCTIONS); } @VisibleForTesting @@ -400,68 +414,84 @@ public interface QuotaManager { .setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES)); } - private static RangeSet<Integer> instanceRangeSet(Set<IInstanceTaskConfig> configs) { - ImmutableRangeSet.Builder<Integer> builder = ImmutableRangeSet.builder(); - for (IInstanceTaskConfig config : configs) { - for (IRange range : config.getInstances()) { - builder.add(Range.closed(range.getFirst(), range.getLast())); - } - } + private static final Function<IInstanceTaskConfig, ITaskConfig> INSTANCE_CONFIG = + new Function<IInstanceTaskConfig, ITaskConfig>() { + @Override + public ITaskConfig apply(IInstanceTaskConfig config) { + return config.getTask(); + } + }; + + private static final Function<ITaskConfig, IResourceAggregate> CONFIG_RESOURCES = + new Function<ITaskConfig, IResourceAggregate>() { + @Override + public IResourceAggregate apply(ITaskConfig config) { + return IResourceAggregate.build(new ResourceAggregate() + .setNumCpus(config.getNumCpus()) + .setRamMb(config.getRamMb()) + .setDiskMb(config.getDiskMb())); + } + }; + + private static final Function<IInstanceTaskConfig, IResourceAggregate> INSTANCE_RESOURCES = + new Function<IInstanceTaskConfig, IResourceAggregate>() { + @Override + public IResourceAggregate apply(IInstanceTaskConfig config) { + return scale(config.getTask(), getUpdateInstanceCount(config.getInstances())); + } + }; + + private static IResourceAggregate instructionsToResources( + Iterable<IInstanceTaskConfig> instructions) { - return builder.build(); + return addAll(FluentIterable.from(instructions).transform(INSTANCE_RESOURCES)); } /** - * This function calculates max aggregate resources consumed by the job update + * Calculates max aggregate resources consumed by the job update * {@code instructions}. The max is calculated between existing and desired task configs on per * resource basis. This means max CPU, RAM and DISK values are computed individually and may * come from different task configurations. While it may not be the most accurate * representation of job update resources during the update, it does guarantee none of the * individual resource values is exceeded during the forward/back roll. - * + * <p/> * NOTE: In case of a job update converting the job production bit (i.e. prod -> non-prod or * non-prod -> prod), only the matching state is counted towards consumption. For example, * prod -> non-prod AND {@code prodConsumption=True}: only the initial state is accounted. - * - * @param instructions Update instructions with resource definitions. - * @param isProd Flag indicating whether the prod or non-prod calculation requested. - * @return Resources consumed by the update. */ - private static IResourceAggregate instructionsToResources( - IJobUpdateInstructions instructions, - final boolean isProd) { - - // Calculate initial state consumption. - IResourceAggregate initial = EMPTY; - for (IInstanceTaskConfig group : instructions.getInitialState()) { - ITaskConfig task = group.getTask(); - if (isProd == task.isProduction()) { - for (IRange range : group.getInstances()) { - initial = add(initial, scale(task, instanceCountFromRange(range))); - } - } - } - - // Calculate desired state consumption. - IResourceAggregate desired = Optional.fromNullable(instructions.getDesiredState()) - .transform(new Function<IInstanceTaskConfig, IResourceAggregate>() { - @Override - public IResourceAggregate apply(IInstanceTaskConfig input) { - return isProd == input.getTask().isProduction() - ? scale(input.getTask(), getUpdateInstanceCount(input.getInstances())) - : EMPTY; - } - }).or(EMPTY); + private static Function<IJobUpdateInstructions, IResourceAggregate> updateResources( + final Predicate<IInstanceTaskConfig> instanceFilter) { - // Calculate result as max(existing, desired) per resource type. - return max(initial, desired); + return new Function<IJobUpdateInstructions, IResourceAggregate>() { + @Override + public IResourceAggregate apply(IJobUpdateInstructions instructions) { + Iterable<IInstanceTaskConfig> initialState = + Iterables.filter(instructions.getInitialState(), instanceFilter); + Iterable<IInstanceTaskConfig> desiredState = Iterables.filter( + Optional.fromNullable(instructions.getDesiredState()).asSet(), + instanceFilter); + + // Calculate result as max(existing, desired) per resource type. + return max( + instructionsToResources(initialState), + instructionsToResources(desiredState)); + } + }; } private static IResourceAggregate add(IResourceAggregate a, IResourceAggregate b) { - return IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(a.getNumCpus() + b.getNumCpus()) - .setRamMb(a.getRamMb() + b.getRamMb()) - .setDiskMb(a.getDiskMb() + b.getDiskMb())); + return addAll(Arrays.asList(a, b)); + } + + private static IResourceAggregate addAll(Iterable<IResourceAggregate> aggregates) { + IResourceAggregate total = EMPTY; + for (IResourceAggregate aggregate : aggregates) { + total = IResourceAggregate.build(new ResourceAggregate() + .setNumCpus(total.getNumCpus() + aggregate.getNumCpus()) + .setRamMb(total.getRamMb() + aggregate.getRamMb()) + .setDiskMb(total.getDiskMb() + aggregate.getDiskMb())); + } + return total; } private static IResourceAggregate subtract(IResourceAggregate a, IResourceAggregate b) { @@ -479,7 +509,7 @@ public interface QuotaManager { } private static IResourceAggregate scale(ITaskConfig taskConfig, int instanceCount) { - return ResourceAggregates.scale(fromTasks(ImmutableSet.of(taskConfig)), instanceCount); + return ResourceAggregates.scale(CONFIG_RESOURCES.apply(taskConfig), instanceCount); } private static IResourceAggregate scale(IJobConfiguration jobConfiguration) { @@ -487,19 +517,7 @@ public interface QuotaManager { } private static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) { - double cpu = 0; - int ramMb = 0; - int diskMb = 0; - for (ITaskConfig task : tasks) { - cpu += task.getNumCpus(); - ramMb += task.getRamMb(); - diskMb += task.getDiskMb(); - } - - return IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(cpu) - .setRamMb(ramMb) - .setDiskMb(diskMb)); + return addAll(Iterables.transform(tasks, CONFIG_RESOURCES)); } private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY = @@ -513,14 +531,10 @@ public interface QuotaManager { private static int getUpdateInstanceCount(Set<IRange> ranges) { int instanceCount = 0; for (IRange range : ranges) { - instanceCount += instanceCountFromRange(range); + instanceCount += range.getLast() - range.getFirst() + 1; } return instanceCount; } - - private static int instanceCountFromRange(IRange range) { - return range.getLast() - range.getFirst() + 1; - } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4b9c759c/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java index b530861..b87ae4e 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java @@ -19,7 +19,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.collect.DiscreteDomain; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableRangeSet; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; @@ -147,14 +146,7 @@ interface UpdateFactory { @VisibleForTesting static Set<Integer> expandInstanceIds(Set<IInstanceTaskConfig> instanceGroups) { - ImmutableRangeSet.Builder<Integer> instanceIds = ImmutableRangeSet.builder(); - for (IInstanceTaskConfig group : instanceGroups) { - for (IRange range : group.getInstances()) { - instanceIds.add(toRange(range)); - } - } - - return instanceIds.build().asSet(DiscreteDomain.integers()); + return Updates.getInstanceIds(instanceGroups).asSet(DiscreteDomain.integers()); } private static Optional<ITaskConfig> getConfig( http://git-wip-us.apache.org/repos/asf/aurora/blob/4b9c759c/src/main/java/org/apache/aurora/scheduler/updater/Updates.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java index 776278c..6466473 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java @@ -15,13 +15,17 @@ package org.apache.aurora.scheduler.updater; import java.util.Set; +import com.google.common.collect.ImmutableRangeSet; +import com.google.common.collect.Range; import com.google.common.collect.Sets; import org.apache.aurora.gen.JobUpdateKey; import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.JobUpdateSummary; import org.apache.aurora.gen.apiConstants; +import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig; import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; +import org.apache.aurora.scheduler.storage.entities.IRange; /** * Utility functions for job updates. @@ -53,4 +57,22 @@ public final class Updates { return IJobUpdateSummary.build(mutableSummary); } } + + /** + * Creates a range set representing all instance IDs represented by a set of instance + * configurations included in a job update. + * + * @param configs Job update components. + * @return A range set representing the instance IDs mentioned in instance groupings. + */ + public static ImmutableRangeSet<Integer> getInstanceIds(Set<IInstanceTaskConfig> configs) { + ImmutableRangeSet.Builder<Integer> builder = ImmutableRangeSet.builder(); + for (IInstanceTaskConfig config : configs) { + for (IRange range : config.getInstances()) { + builder.add(Range.closed(range.getFirst(), range.getLast())); + } + } + + return builder.build(); + } }
