Add a task store implementation that uses a relational database. Bugs closed: AURORA-556
Reviewed at https://reviews.apache.org/r/33612/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/bf7f9b7f Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/bf7f9b7f Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/bf7f9b7f Branch: refs/heads/master Commit: bf7f9b7f93637f78ec9f029b70a7c4bdac2f206a Parents: be75c36 Author: Bill Farner <[email protected]> Authored: Tue May 12 16:51:07 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Tue May 12 16:51:07 2015 -0700 ---------------------------------------------------------------------- .../thrift/org/apache/aurora/gen/api.thrift | 7 +- examples/vagrant/upstart/aurora-scheduler.conf | 1 + .../aurora/benchmark/ThriftApiBenchmarks.java | 2 +- .../aurora/scheduler/app/SchedulerMain.java | 2 +- .../scheduler/storage/db/DBJobUpdateStore.java | 276 -------------- .../scheduler/storage/db/DbJobUpdateStore.java | 276 ++++++++++++++ .../aurora/scheduler/storage/db/DbModule.java | 120 ++++-- .../aurora/scheduler/storage/db/DbStorage.java | 10 + .../scheduler/storage/db/DbTaskStore.java | 361 +++++++++++++++++++ .../aurora/scheduler/storage/db/DbUtil.java | 3 + .../scheduler/storage/db/TaskConfigManager.java | 138 +++++++ .../scheduler/storage/db/TaskConfigMapper.java | 167 +++++++++ .../aurora/scheduler/storage/db/TaskMapper.java | 85 +++++ .../storage/db/shims/ContainerShim.java | 42 +++ .../storage/db/shims/TaskConstraintShim.java | 42 +++ .../typehandlers/ScheduleStatusTypeHandler.java | 26 ++ .../storage/db/typehandlers/TypeHandlers.java | 1 + .../storage/db/views/AssignedPort.java | 40 ++ .../storage/db/views/ScheduledTaskWrapper.java | 48 +++ .../storage/db/views/TaskConfigRow.java | 60 +++ .../scheduler/storage/db/views/TaskLink.java | 40 ++ .../storage/mem/InMemStoresModule.java | 27 +- .../storage/db/JobUpdateDetailsMapper.xml | 12 +- .../scheduler/storage/db/TaskConfigMapper.xml | 323 +++++++++++++++++ .../aurora/scheduler/storage/db/TaskMapper.xml | 228 ++++++++++++ .../scheduler/state/StateManagerImplTest.java | 1 + .../storage/AbstractTaskStoreTest.java | 19 +- .../storage/db/DbJobUpdateStoreTest.java | 2 +- .../scheduler/storage/db/DbTaskStoreTest.java | 66 ++++ .../storage/mem/InMemTaskStoreTest.java | 3 +- .../storage/mem/StorageTransactionTest.java | 42 +-- 31 files changed, 2125 insertions(+), 345 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index 0182ecb..dd54e5b 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -245,8 +245,6 @@ struct TaskConfig { 20: set<Constraint> constraints /** a list of named ports this task requests */ 21: set<string> requestedPorts - /** the container the task should use to execute */ - 29: optional Container container = { "mesos": {} } /** * Custom links to include when displaying this task on the scheduler dashboard. Keys are anchor * text, values are URLs. Wildcards are supported for dynamic link crafting based on host, ports, @@ -258,6 +256,11 @@ struct TaskConfig { 25: optional ExecutorConfig executorConfig /** Used to display additional details in the UI. */ 27: optional set<Metadata> metadata + + // This field is deliberately placed at the end to work around a bug in the immutable wrapper + // code generator. See AURORA-1185 for details. + /** the container the task should use to execute */ + 29: optional Container container = { "mesos": {} } } /** Defines the policy for launching a new cron job when one is already running. */ http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/examples/vagrant/upstart/aurora-scheduler.conf ---------------------------------------------------------------------- diff --git a/examples/vagrant/upstart/aurora-scheduler.conf b/examples/vagrant/upstart/aurora-scheduler.conf index cc4864c..f4b867c 100644 --- a/examples/vagrant/upstart/aurora-scheduler.conf +++ b/examples/vagrant/upstart/aurora-scheduler.conf @@ -42,4 +42,5 @@ exec bin/aurora-scheduler \ -logtostderr \ -allowed_container_types=MESOS,DOCKER \ -http_authentication_mechanism=BASIC \ + -use_beta_db_task_store=true \ -shiro_ini_path=etc/shiro.example.ini http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java index 88d27dd..c3f8b25 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java @@ -151,7 +151,7 @@ public class ThriftApiBenchmarks { bind(StatsProvider.class).toInstance(new FakeStatsProvider()); } }, - new DbModule(Bindings.KeyFactory.PLAIN), + DbModule.productionModule(Bindings.KeyFactory.PLAIN), new ThriftModule.ReadOnly()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java index 239c616..c31446c 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java +++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java @@ -155,7 +155,7 @@ public class SchedulerMain extends AbstractApplication { .addAll(getExtraModules()) .add(getPersistentStorageModule()) .add(new CronModule()) - .add(new DbModule(Bindings.annotatedKeyFactory(Storage.Volatile.class))) + .add(DbModule.productionModule(Bindings.annotatedKeyFactory(Storage.Volatile.class))) .build(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java deleted file mode 100644 index ea56007..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java +++ /dev/null @@ -1,276 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.db; - -import java.util.List; -import java.util.Set; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.twitter.common.base.MorePreconditions; - -import org.apache.aurora.gen.JobUpdate; -import org.apache.aurora.gen.JobUpdateInstructions; -import org.apache.aurora.gen.JobUpdateKey; -import org.apache.aurora.gen.JobUpdateStatus; -import org.apache.aurora.gen.storage.StoredJobUpdateDetails; -import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig; -import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobUpdate; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; -import org.apache.aurora.scheduler.storage.entities.IRange; - -import static java.util.Objects.requireNonNull; - -import static com.twitter.common.inject.TimedInterceptor.Timed; - -/** - * A relational database-backed job update store. - */ -public class DBJobUpdateStore implements JobUpdateStore.Mutable { - - private final JobKeyMapper jobKeyMapper; - private final JobUpdateDetailsMapper detailsMapper; - private final JobUpdateEventMapper jobEventMapper; - private final JobInstanceUpdateEventMapper instanceEventMapper; - private final CachedCounters stats; - - @Inject - DBJobUpdateStore( - JobKeyMapper jobKeyMapper, - JobUpdateDetailsMapper detailsMapper, - JobUpdateEventMapper jobEventMapper, - JobInstanceUpdateEventMapper instanceEventMapper, - CachedCounters stats) { - - this.jobKeyMapper = requireNonNull(jobKeyMapper); - this.detailsMapper = requireNonNull(detailsMapper); - this.jobEventMapper = requireNonNull(jobEventMapper); - this.instanceEventMapper = requireNonNull(instanceEventMapper); - this.stats = requireNonNull(stats); - } - - @Timed("job_update_store_save_update") - @Override - public void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) { - requireNonNull(update); - if (!update.getInstructions().isSetDesiredState() - && update.getInstructions().getInitialState().isEmpty()) { - throw new IllegalArgumentException( - "Missing both initial and desired states. At least one is required."); - } - - IJobUpdateSummary summary = update.getSummary(); - jobKeyMapper.merge(summary.getJobKey().newBuilder()); - detailsMapper.insert(update.newBuilder()); - - IJobUpdateKey key = IJobUpdateKey.build( - new JobUpdateKey(summary.getJobKey().newBuilder(), summary.getUpdateId())); - if (lockToken.isPresent()) { - detailsMapper.insertLockToken(key, lockToken.get()); - } - - // Insert optional instance update overrides. - Set<IRange> instanceOverrides = - update.getInstructions().getSettings().getUpdateOnlyTheseInstances(); - - if (!instanceOverrides.isEmpty()) { - detailsMapper.insertInstanceOverrides(key, IRange.toBuildersSet(instanceOverrides)); - } - - // Insert desired state task config and instance mappings. - if (update.getInstructions().isSetDesiredState()) { - IInstanceTaskConfig desired = update.getInstructions().getDesiredState(); - detailsMapper.insertTaskConfig( - key, - desired.getTask().newBuilder(), - true, - new InsertResult()); - - detailsMapper.insertDesiredInstances( - key, - IRange.toBuildersSet(MorePreconditions.checkNotBlank(desired.getInstances()))); - } - - // Insert initial state task configs and instance mappings. - if (!update.getInstructions().getInitialState().isEmpty()) { - for (IInstanceTaskConfig config : update.getInstructions().getInitialState()) { - InsertResult result = new InsertResult(); - detailsMapper.insertTaskConfig(key, config.getTask().newBuilder(), false, result); - - detailsMapper.insertTaskConfigInstances( - result.getId(), - IRange.toBuildersSet(MorePreconditions.checkNotBlank(config.getInstances()))); - } - } - } - - @VisibleForTesting - static String statName(JobUpdateStatus status) { - return "update_transition_" + status; - } - - @Timed("job_update_store_save_event") - @Override - public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) { - stats.get(statName(event.getStatus())).incrementAndGet(); - jobEventMapper.insert(key, event.newBuilder()); - } - - @Timed("job_update_store_save_instance_event") - @Override - public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) { - instanceEventMapper.insert(key, event.newBuilder()); - } - - @Timed("job_update_store_delete_all") - @Override - public void deleteAllUpdatesAndEvents() { - detailsMapper.truncate(); - } - - private static final Function<PruneVictim, Long> GET_ROW_ID = new Function<PruneVictim, Long>() { - @Override - public Long apply(PruneVictim victim) { - return victim.getRowId(); - } - }; - - private static final Function<PruneVictim, IJobUpdateKey> GET_UPDATE_KEY = - new Function<PruneVictim, IJobUpdateKey>() { - @Override - public IJobUpdateKey apply(PruneVictim victim) { - return IJobUpdateKey.build(victim.getUpdate()); - } - }; - - @Timed("job_update_store_prune_history") - @Override - public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) { - ImmutableSet.Builder<IJobUpdateKey> pruned = ImmutableSet.builder(); - - Set<Long> jobKeyIdsToPrune = detailsMapper.selectJobKeysForPruning( - perJobRetainCount, - historyPruneThresholdMs); - - for (Long jobKeyId : jobKeyIdsToPrune) { - Set<PruneVictim> pruneVictims = detailsMapper.selectPruneVictims( - jobKeyId, - perJobRetainCount, - historyPruneThresholdMs); - - detailsMapper.deleteCompletedUpdates( - FluentIterable.from(pruneVictims).transform(GET_ROW_ID).toSet()); - pruned.addAll(FluentIterable.from(pruneVictims).transform(GET_UPDATE_KEY)); - } - - return pruned.build(); - } - - @Timed("job_update_store_fetch_summaries") - @Override - public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) { - return IJobUpdateSummary.listFromBuilders(detailsMapper.selectSummaries(query.newBuilder())); - } - - @Timed("job_update_store_fetch_details_list") - @Override - public List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) { - return FluentIterable - .from(detailsMapper.selectDetailsList(query.newBuilder())) - .transform(new Function<StoredJobUpdateDetails, IJobUpdateDetails>() { - @Override - public IJobUpdateDetails apply(StoredJobUpdateDetails input) { - return IJobUpdateDetails.build(input.getDetails()); - } - }).toList(); - } - - @Timed("job_update_store_fetch_details") - @Override - public Optional<IJobUpdateDetails> fetchJobUpdateDetails(final IJobUpdateKey key) { - return Optional.fromNullable(detailsMapper.selectDetails(key)) - .transform(new Function<StoredJobUpdateDetails, IJobUpdateDetails>() { - @Override - public IJobUpdateDetails apply(StoredJobUpdateDetails input) { - return IJobUpdateDetails.build(input.getDetails()); - } - }); - } - - @Timed("job_update_store_fetch_update") - @Override - public Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) { - return Optional.fromNullable(detailsMapper.selectUpdate(key)) - .transform(new Function<JobUpdate, IJobUpdate>() { - @Override - public IJobUpdate apply(JobUpdate input) { - return IJobUpdate.build(input); - } - }); - } - - @Timed("job_update_store_fetch_instructions") - @Override - public Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key) { - return Optional.fromNullable(detailsMapper.selectInstructions(key)) - .transform(new Function<JobUpdateInstructions, IJobUpdateInstructions>() { - @Override - public IJobUpdateInstructions apply(JobUpdateInstructions input) { - return IJobUpdateInstructions.build(input); - } - }); - } - - @Timed("job_update_store_fetch_all_details") - @Override - public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() { - return ImmutableSet.copyOf(detailsMapper.selectAllDetails()); - } - - @Timed("job_update_store_fetch_update_key") - @Override - public Optional<IJobUpdateKey> fetchUpdateKey(String updateId) { - return Optional.fromNullable(detailsMapper.selectUpdateKey(updateId)) - .transform(IJobUpdateKey.FROM_BUILDER); - } - - @Timed("job_update_store_get_lock_token") - @Override - public Optional<String> getLockToken(IJobUpdateKey key) { - // We assume here that cascading deletes will cause a lock-update associative row to disappear - // when the lock is invalidated. This further assumes that a lock row is deleted when a lock - // is no longer valid. - return Optional.fromNullable(detailsMapper.selectLockToken(key)); - } - - @Timed("job_update_store_fetch_instance_events") - @Override - public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId) { - return IJobInstanceUpdateEvent.listFromBuilders( - detailsMapper.selectInstanceUpdateEvents(key, instanceId)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java new file mode 100644 index 0000000..4b9d7f5 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java @@ -0,0 +1,276 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db; + +import java.util.List; +import java.util.Set; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.twitter.common.base.MorePreconditions; + +import org.apache.aurora.gen.JobUpdate; +import org.apache.aurora.gen.JobUpdateInstructions; +import org.apache.aurora.gen.JobUpdateKey; +import org.apache.aurora.gen.JobUpdateStatus; +import org.apache.aurora.gen.storage.StoredJobUpdateDetails; +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.JobUpdateStore; +import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig; +import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdate; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; +import org.apache.aurora.scheduler.storage.entities.IRange; + +import static java.util.Objects.requireNonNull; + +import static com.twitter.common.inject.TimedInterceptor.Timed; + +/** + * A relational database-backed job update store. + */ +public class DbJobUpdateStore implements JobUpdateStore.Mutable { + + private final JobKeyMapper jobKeyMapper; + private final JobUpdateDetailsMapper detailsMapper; + private final JobUpdateEventMapper jobEventMapper; + private final JobInstanceUpdateEventMapper instanceEventMapper; + private final CachedCounters stats; + + @Inject + DbJobUpdateStore( + JobKeyMapper jobKeyMapper, + JobUpdateDetailsMapper detailsMapper, + JobUpdateEventMapper jobEventMapper, + JobInstanceUpdateEventMapper instanceEventMapper, + CachedCounters stats) { + + this.jobKeyMapper = requireNonNull(jobKeyMapper); + this.detailsMapper = requireNonNull(detailsMapper); + this.jobEventMapper = requireNonNull(jobEventMapper); + this.instanceEventMapper = requireNonNull(instanceEventMapper); + this.stats = requireNonNull(stats); + } + + @Timed("job_update_store_save_update") + @Override + public void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) { + requireNonNull(update); + if (!update.getInstructions().isSetDesiredState() + && update.getInstructions().getInitialState().isEmpty()) { + throw new IllegalArgumentException( + "Missing both initial and desired states. At least one is required."); + } + + IJobUpdateSummary summary = update.getSummary(); + jobKeyMapper.merge(summary.getJobKey().newBuilder()); + detailsMapper.insert(update.newBuilder()); + + IJobUpdateKey key = IJobUpdateKey.build( + new JobUpdateKey(summary.getJobKey().newBuilder(), summary.getUpdateId())); + if (lockToken.isPresent()) { + detailsMapper.insertLockToken(key, lockToken.get()); + } + + // Insert optional instance update overrides. + Set<IRange> instanceOverrides = + update.getInstructions().getSettings().getUpdateOnlyTheseInstances(); + + if (!instanceOverrides.isEmpty()) { + detailsMapper.insertInstanceOverrides(key, IRange.toBuildersSet(instanceOverrides)); + } + + // Insert desired state task config and instance mappings. + if (update.getInstructions().isSetDesiredState()) { + IInstanceTaskConfig desired = update.getInstructions().getDesiredState(); + detailsMapper.insertTaskConfig( + key, + desired.getTask().newBuilder(), + true, + new InsertResult()); + + detailsMapper.insertDesiredInstances( + key, + IRange.toBuildersSet(MorePreconditions.checkNotBlank(desired.getInstances()))); + } + + // Insert initial state task configs and instance mappings. + if (!update.getInstructions().getInitialState().isEmpty()) { + for (IInstanceTaskConfig config : update.getInstructions().getInitialState()) { + InsertResult result = new InsertResult(); + detailsMapper.insertTaskConfig(key, config.getTask().newBuilder(), false, result); + + detailsMapper.insertTaskConfigInstances( + result.getId(), + IRange.toBuildersSet(MorePreconditions.checkNotBlank(config.getInstances()))); + } + } + } + + @VisibleForTesting + static String statName(JobUpdateStatus status) { + return "update_transition_" + status; + } + + @Timed("job_update_store_save_event") + @Override + public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) { + stats.get(statName(event.getStatus())).incrementAndGet(); + jobEventMapper.insert(key, event.newBuilder()); + } + + @Timed("job_update_store_save_instance_event") + @Override + public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) { + instanceEventMapper.insert(key, event.newBuilder()); + } + + @Timed("job_update_store_delete_all") + @Override + public void deleteAllUpdatesAndEvents() { + detailsMapper.truncate(); + } + + private static final Function<PruneVictim, Long> GET_ROW_ID = new Function<PruneVictim, Long>() { + @Override + public Long apply(PruneVictim victim) { + return victim.getRowId(); + } + }; + + private static final Function<PruneVictim, IJobUpdateKey> GET_UPDATE_KEY = + new Function<PruneVictim, IJobUpdateKey>() { + @Override + public IJobUpdateKey apply(PruneVictim victim) { + return IJobUpdateKey.build(victim.getUpdate()); + } + }; + + @Timed("job_update_store_prune_history") + @Override + public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) { + ImmutableSet.Builder<IJobUpdateKey> pruned = ImmutableSet.builder(); + + Set<Long> jobKeyIdsToPrune = detailsMapper.selectJobKeysForPruning( + perJobRetainCount, + historyPruneThresholdMs); + + for (long jobKeyId : jobKeyIdsToPrune) { + Set<PruneVictim> pruneVictims = detailsMapper.selectPruneVictims( + jobKeyId, + perJobRetainCount, + historyPruneThresholdMs); + + detailsMapper.deleteCompletedUpdates( + FluentIterable.from(pruneVictims).transform(GET_ROW_ID).toSet()); + pruned.addAll(FluentIterable.from(pruneVictims).transform(GET_UPDATE_KEY)); + } + + return pruned.build(); + } + + @Timed("job_update_store_fetch_summaries") + @Override + public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) { + return IJobUpdateSummary.listFromBuilders(detailsMapper.selectSummaries(query.newBuilder())); + } + + @Timed("job_update_store_fetch_details_list") + @Override + public List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) { + return FluentIterable + .from(detailsMapper.selectDetailsList(query.newBuilder())) + .transform(new Function<StoredJobUpdateDetails, IJobUpdateDetails>() { + @Override + public IJobUpdateDetails apply(StoredJobUpdateDetails input) { + return IJobUpdateDetails.build(input.getDetails()); + } + }).toList(); + } + + @Timed("job_update_store_fetch_details") + @Override + public Optional<IJobUpdateDetails> fetchJobUpdateDetails(final IJobUpdateKey key) { + return Optional.fromNullable(detailsMapper.selectDetails(key)) + .transform(new Function<StoredJobUpdateDetails, IJobUpdateDetails>() { + @Override + public IJobUpdateDetails apply(StoredJobUpdateDetails input) { + return IJobUpdateDetails.build(input.getDetails()); + } + }); + } + + @Timed("job_update_store_fetch_update") + @Override + public Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) { + return Optional.fromNullable(detailsMapper.selectUpdate(key)) + .transform(new Function<JobUpdate, IJobUpdate>() { + @Override + public IJobUpdate apply(JobUpdate input) { + return IJobUpdate.build(input); + } + }); + } + + @Timed("job_update_store_fetch_instructions") + @Override + public Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key) { + return Optional.fromNullable(detailsMapper.selectInstructions(key)) + .transform(new Function<JobUpdateInstructions, IJobUpdateInstructions>() { + @Override + public IJobUpdateInstructions apply(JobUpdateInstructions input) { + return IJobUpdateInstructions.build(input); + } + }); + } + + @Timed("job_update_store_fetch_all_details") + @Override + public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() { + return ImmutableSet.copyOf(detailsMapper.selectAllDetails()); + } + + @Timed("job_update_store_fetch_update_key") + @Override + public Optional<IJobUpdateKey> fetchUpdateKey(String updateId) { + return Optional.fromNullable(detailsMapper.selectUpdateKey(updateId)) + .transform(IJobUpdateKey.FROM_BUILDER); + } + + @Timed("job_update_store_get_lock_token") + @Override + public Optional<String> getLockToken(IJobUpdateKey key) { + // We assume here that cascading deletes will cause a lock-update associative row to disappear + // when the lock is invalidated. This further assumes that a lock row is deleted when a lock + // is no longer valid. + return Optional.fromNullable(detailsMapper.selectLockToken(key)); + } + + @Timed("job_update_store_fetch_instance_events") + @Override + public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId) { + return IJobInstanceUpdateEvent.listFromBuilders( + detailsMapper.selectInstanceUpdateEvents(key, instanceId)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java index 8859ca4..e351588 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.storage.db; import java.util.Set; +import java.util.UUID; import javax.inject.Singleton; @@ -21,8 +22,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Key; +import com.google.inject.Module; import com.google.inject.PrivateModule; +import com.google.inject.TypeLiteral; +import com.twitter.common.args.Arg; +import com.twitter.common.args.CmdLine; import com.twitter.common.inject.Bindings.KeyFactory; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; @@ -47,22 +54,17 @@ import static com.google.inject.name.Names.bindProperties; /** * Binding module for a relational database storage system. - * <p> - * Currently only exposes bindings for: - * <ul> - * <li>{@link org.apache.aurora.scheduler.storage.db.DbStorage}</li> - * <li>{@link org.apache.ibatis.session.SqlSessionFactory}</li> - * <li>Keys provided by the provided{@code keyFactory} for: - * <ul> - * <li>{@link LockStore.Mutable}</li> - * <li>{@link QuotaStore.Mutable}</li> - * <li>{@link SchedulerStore.Mutable}</li> - * </ul> - * </li> - * </ul> - * </p> */ -public class DbModule extends PrivateModule { +public final class DbModule extends PrivateModule { + + @CmdLine(name = "use_beta_db_task_store", + help = "Whether to use the experimental database-backed task store.") + private static final Arg<Boolean> USE_DB_TASK_STORE = Arg.create(false); + + @CmdLine(name = "slow_query_log_threshold", + help = "Log all queries that take at least this long to execute.") + private static final Arg<Amount<Long, Time>> SLOW_QUERY_LOG_THRESHOLD = + Arg.create(Amount.of(25L, Time.MILLISECONDS)); private static final Set<Class<?>> MAPPER_CLASSES = ImmutableSet.<Class<?>>builder() .add(AttributeMapper.class) @@ -74,19 +76,54 @@ public class DbModule extends PrivateModule { .add(JobUpdateDetailsMapper.class) .add(LockMapper.class) .add(QuotaMapper.class) + .add(TaskConfigMapper.class) + .add(TaskMapper.class) .build(); private final KeyFactory keyFactory; + private final Module taskStoreModule; private final String jdbcSchema; - private DbModule(KeyFactory keyFactory, String jdbcSchema) { + private DbModule(KeyFactory keyFactory, Module taskStoreModule, String jdbcSchema) { this.keyFactory = requireNonNull(keyFactory); + this.taskStoreModule = requireNonNull(taskStoreModule); // We always disable the MvStore, as it is in beta as of this writing. this.jdbcSchema = jdbcSchema + ";MV_STORE=false"; } - public DbModule(KeyFactory keyFactory) { - this(keyFactory, "aurora;DB_CLOSE_DELAY=-1"); + /** + * Creates a module that will prepare a volatile storage system suitable for use in a production + * environment. + * + * @param keyFactory Binding scope for the storage system. + * @return A new database module for production. + */ + public static Module productionModule(KeyFactory keyFactory) { + return new DbModule( + keyFactory, + USE_DB_TASK_STORE.get() + ? new TaskStoreModule(keyFactory) + : new InMemStoresModule.TaskStoreModule(keyFactory), + "aurora;DB_CLOSE_DELAY=-1"); + } + + /** + * Creates a module that will prepare a private in-memory database, using a specific task store + * implementation bound within the provided module. + * + * @param taskStoreModule Module providing task store bindings. + * @return A new database module for testing. + */ + @VisibleForTesting + public static Module testModule(Module taskStoreModule) { + return new DbModule( + KeyFactory.PLAIN, + taskStoreModule, + // A non-zero close delay is used here to avoid eager database cleanup in tests that + // make use of multiple threads. Since all test databases are separately scoped by the + // included UUID, multiple DB instances will overlap in time but they should be distinct + // in content. + "testdb-" + UUID.randomUUID().toString() + ";DB_CLOSE_DELAY=5"); } /** @@ -95,10 +132,8 @@ public class DbModule extends PrivateModule { * @return A new database module for testing. */ @VisibleForTesting - public static DbModule testModule() { - // This creates a private in-memory database. New connections will have a _new_ database, - // and closing the database will expunge its data. - return new DbModule(KeyFactory.PLAIN, ""); + public static Module testModule() { + return testModule(new DbModule.TaskStoreModule(KeyFactory.PLAIN)); } private <T> void bindStore(Class<T> binding, Class<? extends T> impl) { @@ -131,9 +166,21 @@ public class DbModule extends PrivateModule { autoMappingBehavior(AutoMappingBehavior.FULL); addTypeHandlersClasses(TypeHandlers.getAll()); + + bind(new TypeLiteral<Amount<Long, Time>>() { }).toInstance(SLOW_QUERY_LOG_THRESHOLD.get()); + + // Exposed for unit tests. + bind(TaskConfigManager.class); + expose(TaskConfigManager.class); + + // TODO(wfarner): Don't expose these bindings once the task store is directly bound here. + expose(TaskMapper.class); + expose(TaskConfigManager.class); + expose(JobKeyMapper.class); } }); install(new InMemStoresModule(keyFactory)); + install(taskStoreModule); expose(keyFactory.create(CronJobStore.Mutable.class)); expose(keyFactory.create(TaskStore.Mutable.class)); @@ -141,7 +188,7 @@ public class DbModule extends PrivateModule { bindStore(LockStore.Mutable.class, DbLockStore.class); bindStore(QuotaStore.Mutable.class, DbQuotaStore.class); bindStore(SchedulerStore.Mutable.class, DbSchedulerStore.class); - bindStore(JobUpdateStore.Mutable.class, DBJobUpdateStore.class); + bindStore(JobUpdateStore.Mutable.class, DbJobUpdateStore.class); Key<Storage> storageKey = keyFactory.create(Storage.class); bind(storageKey).to(DbStorage.class); @@ -151,4 +198,31 @@ public class DbModule extends PrivateModule { expose(DbStorage.class); expose(SqlSessionFactory.class); } + + /** + * A module that binds a database task store. + * <p/> + * TODO(wfarner): Inline these bindings once there is only one task store implementation. + */ + public static class TaskStoreModule extends PrivateModule { + private final KeyFactory keyFactory; + + public TaskStoreModule(KeyFactory keyFactory) { + this.keyFactory = requireNonNull(keyFactory); + } + + private <T> void bindStore(Class<T> binding, Class<? extends T> impl) { + bind(binding).to(impl); + bind(impl).in(Singleton.class); + Key<T> key = keyFactory.create(binding); + bind(key).to(impl); + expose(key); + } + + @Override + protected void configure() { + bindStore(TaskStore.Mutable.class, DbTaskStore.class); + expose(TaskStore.Mutable.class); + } + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java index 1a6c3f2..bb61542 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java @@ -26,6 +26,7 @@ import com.twitter.common.inject.TimedInterceptor.Timed; import org.apache.aurora.gen.JobUpdateAction; import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; @@ -38,6 +39,7 @@ import org.apache.ibatis.builder.StaticSqlSource; import org.apache.ibatis.exceptions.PersistenceException; import org.apache.ibatis.mapping.MappedStatement.Builder; import org.apache.ibatis.session.Configuration; +import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.guice.transactional.Transactional; @@ -190,6 +192,10 @@ class DbStorage extends AbstractIdleService implements Storage { String createStatementName = "create_tables"; configuration.setMapUnderscoreToCamelCase(true); + // The ReuseExecutor will cache jdbc Statements with equivalent SQL, improving performance + // slightly when redundant queries are made. + configuration.setDefaultExecutorType(ExecutorType.REUSE); + addMappedStatement( configuration, createStatementName, @@ -214,6 +220,10 @@ class DbStorage extends AbstractIdleService implements Storage { for (JobUpdateAction action : JobUpdateAction.values()) { enumValueMapper.addEnumValue("job_instance_update_actions", action.getValue(), action.name()); } + + for (ScheduleStatus status : ScheduleStatus.values()) { + enumValueMapper.addEnumValue("task_states", status.getValue(), status.name()); + } } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java new file mode 100644 index 0000000..76f65da --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java @@ -0,0 +1,361 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.inject.Inject; +import com.twitter.common.inject.TimedInterceptor.Timed; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Clock; + +import org.apache.aurora.gen.Constraint; +import org.apache.aurora.gen.Container; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.TaskConstraint; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Query.Builder; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.storage.TaskStore; +import org.apache.aurora.scheduler.storage.db.views.AssignedPort; +import org.apache.aurora.scheduler.storage.db.views.ScheduledTaskWrapper; +import org.apache.aurora.scheduler.storage.db.views.TaskConfigRow; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; + +import static java.util.Objects.requireNonNull; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A task store implementation based on a relational database. + * <p> + * TODO(wfarner): Consider modifying code generator to support directly producing ITaskConfig, etc + * from myBatis (it will set private final fields just fine). This would reduce memory and time + * spent translating and copying objects. + */ +class DbTaskStore implements TaskStore.Mutable { + + private static final Logger LOG = Logger.getLogger(DbTaskStore.class.getName()); + + private final TaskMapper taskMapper; + private final TaskConfigManager configManager; + private final JobKeyMapper jobKeyMapper; + private final Clock clock; + private final long slowQueryThresholdNanos; + + @Inject + DbTaskStore( + TaskMapper taskMapper, + TaskConfigManager configManager, + JobKeyMapper jobKeyMapper, + Clock clock, + Amount<Long, Time> slowQueryThreshold) { + + LOG.warning("DbTaskStore is experimental, and should not be used in production clusters!"); + this.taskMapper = requireNonNull(taskMapper); + this.configManager = requireNonNull(configManager); + this.jobKeyMapper = requireNonNull(jobKeyMapper); + this.clock = requireNonNull(clock); + this.slowQueryThresholdNanos = slowQueryThreshold.as(Time.NANOSECONDS); + } + + @Timed("db_storage_fetch_tasks") + @Override + public ImmutableSet<IScheduledTask> fetchTasks(Builder query) { + requireNonNull(query); + + // TODO(wfarner): Consider making slow query logging more reusable, or pushing it down into the + // database. + long start = clock.nowNanos(); + ImmutableSet<IScheduledTask> result = matches(query).toSet(); + long durationNanos = clock.nowNanos() - start; + Level level = durationNanos >= slowQueryThresholdNanos ? Level.INFO : Level.FINE; + if (LOG.isLoggable(level)) { + Long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS); + LOG.log(level, "Query took " + time + " ms: " + query.get()); + } + + return result; + } + + private static final Function<TaskConfigRow, Long> CONFIG_ID = + new Function<TaskConfigRow, Long>() { + @Override + public Long apply(TaskConfigRow row) { + return row.getId(); + } + }; + + /** + * Computes an association between config table row ID and {@link ITaskConfig} object for all + * configs in the provided jobs. + * + * @param jobs Jobs to fetch task configs for. + * @return A mutable bi-map between row ID and task config. + */ + private Map<ITaskConfig, Long> getTaskConfigRows(Set<IJobKey> jobs) { + Function<IJobKey, Iterable<TaskConfigRow>> getRows = + new Function<IJobKey, Iterable<TaskConfigRow>>() { + @Override + public Iterable<TaskConfigRow> apply(IJobKey job) { + return configManager.getConfigs(job); + } + }; + + Map<ITaskConfig, TaskConfigRow> rowsToIds = + FluentIterable.from(jobs) + .transformAndConcat(getRows) + .uniqueIndex(Functions.compose(ITaskConfig.FROM_BUILDER, getConfigSaturator())); + + return Maps.transformValues(rowsToIds, CONFIG_ID); + } + + @Timed("db_storage_save_tasks") + @Override + public void saveTasks(Set<IScheduledTask> tasks) { + if (tasks.isEmpty()) { + return; + } + + // TODO(wfarner): Restrict the TaskStore.Mutable methods to more specific mutations. It would + // simplify this code if we did not have to handle full object tree mutations. + + deleteTasks(Tasks.ids(tasks)); + + // Maintain a cache of all task configs that exist for a job key so that identical entities + LoadingCache<ITaskConfig, Long> configCache = CacheBuilder.newBuilder() + .build(new CacheLoader<ITaskConfig, Long>() { + @Override + public Long load(ITaskConfig config) { + return configManager.insert(config); + } + }); + + // Seed the cache with known configs in the jobs being updated. + configCache.putAll(getTaskConfigRows( + FluentIterable.from(tasks) + .transform(Tasks.SCHEDULED_TO_JOB_KEY) + .toSet())); + + for (IScheduledTask task : tasks) { + jobKeyMapper.merge(task.getAssignedTask().getTask().getJob().newBuilder()); + long configId = configCache.getUnchecked(task.getAssignedTask().getTask()); + + ScheduledTaskWrapper wrappedTask = new ScheduledTaskWrapper(-1, configId, task.newBuilder()); + taskMapper.insertScheduledTask(wrappedTask); + Preconditions.checkState( + wrappedTask.getTaskRowId() != -1, + "Row ID should have been populated during insert."); + if (!task.getTaskEvents().isEmpty()) { + taskMapper.insertTaskEvents(wrappedTask.getTaskRowId(), task.getTaskEvents()); + } + if (!task.getAssignedTask().getAssignedPorts().isEmpty()) { + taskMapper.insertPorts( + wrappedTask.getTaskRowId(), + toAssignedPorts(task.getAssignedTask().getAssignedPorts())); + } + } + } + + private static List<AssignedPort> toAssignedPorts(Map<String, Integer> ports) { + // Mybatis does not seem to support inserting maps where the keys are not known in advance (it + // treats them as bags of properties, presumably like a cheap bean object). + // See https://github.com/mybatis/mybatis-3/pull/208, and seemingly-relevant code in + // https://github.com/mybatis/mybatis-3/blob/4cfc129938fd6b5cb20c4b741392e8b3fa41b529/src + // main/java/org/apache/ibatis/scripting/xmltags/ForEachSqlNode.java#L73-L77. + ImmutableList.Builder<AssignedPort> list = ImmutableList.builder(); + for (Map.Entry<String, Integer> entry : ports.entrySet()) { + list.add(new AssignedPort(entry.getKey(), entry.getValue())); + } + return list.build(); + } + + @Timed("db_storage_delete_all_tasks") + @Override + public void deleteAllTasks() { + // TODO(wfarner): Need to re-evaluate all task configs after deleting tasks. + taskMapper.truncate(); + } + + @Timed("db_storage_delete_tasks") + @Override + public void deleteTasks(Set<String> taskIds) { + if (!taskIds.isEmpty()) { + // First fetch task configs referenced by these task IDs. + List<Long> configIds = configManager.getTaskConfigIds(taskIds); + + taskMapper.deleteTasks(taskIds); + + if (!configIds.isEmpty()) { + configManager.maybeExpungeConfigs(ImmutableSet.copyOf(configIds)); + } + } + } + + @Timed("db_storage_mutate_tasks") + @Override + public ImmutableSet<IScheduledTask> mutateTasks( + Builder query, + Function<IScheduledTask, IScheduledTask> mutator) { + + requireNonNull(query); + requireNonNull(mutator); + + ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder(); + for (IScheduledTask original : fetchTasks(query)) { + IScheduledTask maybeMutated = mutator.apply(original); + if (!original.equals(maybeMutated)) { + Preconditions.checkState( + Tasks.id(original).equals(Tasks.id(maybeMutated)), + "A task's ID may not be mutated."); + saveTasks(ImmutableSet.of(maybeMutated)); + mutated.add(maybeMutated); + } + } + + return mutated.build(); + } + + @Timed("db_storage_unsafe_modify_in_place") + @Override + public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) { + checkNotNull(taskId); + checkNotNull(taskConfiguration); + Optional<IScheduledTask> task = + Optional.fromNullable(Iterables.getOnlyElement(fetchTasks(Query.taskScoped(taskId)), null)); + if (task.isPresent()) { + deleteTasks(ImmutableSet.of(taskId)); + ScheduledTask builder = task.get().newBuilder(); + builder.getAssignedTask().setTask(taskConfiguration.newBuilder()); + saveTasks(ImmutableSet.of(IScheduledTask.build(builder))); + return true; + } + return false; + } + + private Function<TaskConfigRow, TaskConfig> getConfigSaturator() { + // It appears that there is no way in mybatis to populate a field of type Map. To work around + // this, we need to manually perform the query and associate the elements. + final LoadingCache<Long, Map<String, String>> taskLinkCache = CacheBuilder.newBuilder() + .build(new CacheLoader<Long, Map<String, String>>() { + @Override + public Map<String, String> load(Long configId) { + return configManager.getTaskLinks(configId); + } + }); + Function<TaskConfigRow, TaskConfig> linkPopulator = new Function<TaskConfigRow, TaskConfig>() { + @Override + public TaskConfig apply(TaskConfigRow row) { + return row.getConfig().setTaskLinks(taskLinkCache.getUnchecked(row.getId())); + } + }; + + return Functions.compose(REPLACE_UNION_TYPES, linkPopulator); + } + + private FluentIterable<ScheduledTaskWrapper> fetchRows(Query.Builder query) { + final Function<TaskConfigRow, TaskConfig> configSaturator = getConfigSaturator(); + return FluentIterable.from(taskMapper.select(query.get())) + .transform(populateAssignedPorts) + .transform(new Function<ScheduledTaskWrapper, ScheduledTaskWrapper>() { + @Override + public ScheduledTaskWrapper apply(ScheduledTaskWrapper task) { + configSaturator.apply( + new TaskConfigRow( + task.getTaskConfigRowId(), + task.getTask().getAssignedTask().getTask())); + return task; + } + }); + } + + private final Function<ScheduledTaskWrapper, ScheduledTaskWrapper> populateAssignedPorts = + new Function<ScheduledTaskWrapper, ScheduledTaskWrapper>() { + @Override + public ScheduledTaskWrapper apply(ScheduledTaskWrapper task) { + ImmutableMap.Builder<String, Integer> ports = ImmutableMap.builder(); + for (AssignedPort port : taskMapper.selectPorts(task.getTaskRowId())) { + ports.put(port.getName(), port.getPort()); + } + task.getTask().getAssignedTask().setAssignedPorts(ports.build()); + return task; + } + }; + + private FluentIterable<IScheduledTask> matches(Query.Builder query) { + return fetchRows(query) + .transform(UNWRAP) + .transform(IScheduledTask.FROM_BUILDER); + } + + private static final Function<ScheduledTaskWrapper, ScheduledTask> UNWRAP = + new Function<ScheduledTaskWrapper, ScheduledTask>() { + @Override + public ScheduledTask apply(ScheduledTaskWrapper task) { + return task.getTask(); + } + }; + + /** + * Replaces the shimmed {@link org.apache.thrift.TUnion} instances with the base thrift types. + * This is necessary because TUnion, as of thrift 0.9.1, restricts subclassing. The copy + * constructor checks for equality on {@link Object#getClass()} rather than the subclass-friendly + * {@link Class#isInstance(Object). + */ + private static final Function<TaskConfig, TaskConfig> REPLACE_UNION_TYPES = + new Function<TaskConfig, TaskConfig>() { + @Override + public TaskConfig apply(TaskConfig config) { + ImmutableSet.Builder<Constraint> constraints = ImmutableSet.builder(); + for (Constraint constraint : config.getConstraints()) { + Constraint replacement = new Constraint() + .setName(constraint.getName()); + replacement.setConstraint( + new TaskConstraint( + constraint.getConstraint().getSetField(), + constraint.getConstraint().getFieldValue())); + constraints.add(replacement); + } + config.setConstraints(constraints.build()); + + config.setContainer( + new Container( + config.getContainer().getSetField(), + config.getContainer().getFieldValue())); + + return config; + } + }; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java index 7b4067c..fe8e3f8 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java @@ -17,6 +17,8 @@ import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; import com.twitter.common.stats.StatsProvider; +import com.twitter.common.util.Clock; +import com.twitter.common.util.testing.FakeClock; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.testing.FakeStatsProvider; @@ -45,6 +47,7 @@ public final class DbUtil { FakeStatsProvider stats = new FakeStatsProvider(); bind(StatsProvider.class).toInstance(stats); bind(FakeStatsProvider.class).toInstance(stats); + bind(Clock.class).toInstance(new FakeClock()); } }); Storage storage = injector.getInstance(Storage.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java new file mode 100644 index 0000000..3ada628 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java @@ -0,0 +1,138 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.inject.Inject; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; + +import org.apache.aurora.scheduler.storage.db.views.TaskConfigRow; +import org.apache.aurora.scheduler.storage.db.views.TaskLink; +import org.apache.aurora.scheduler.storage.entities.IConstraint; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.entities.IValueConstraint; + +import static java.util.Objects.requireNonNull; + +class TaskConfigManager { + private final TaskConfigMapper configMapper; + + @Inject + TaskConfigManager(TaskConfigMapper configMapper) { + this.configMapper = requireNonNull(configMapper); + } + + long insert(ITaskConfig config) { + InsertResult configInsert = new InsertResult(); + configMapper.insert(config, configInsert); + for (IConstraint constraint : config.getConstraints()) { + InsertResult constraintResult = new InsertResult(); + configMapper.insertConstraint(configInsert.getId(), constraint, constraintResult); + switch (constraint.getConstraint().getSetField()) { + case VALUE: + IValueConstraint valueConstraint = constraint.getConstraint().getValue(); + InsertResult valueResult = new InsertResult(); + configMapper.insertValueConstraint( + constraintResult.getId(), + valueConstraint, + valueResult); + configMapper.insertValueConstraintValues( + valueResult.getId(), + valueConstraint.getValues()); + break; + + case LIMIT: + configMapper.insertLimitConstraint( + constraintResult.getId(), + constraint.getConstraint().getLimit()); + break; + + default: + throw new IllegalStateException( + "Unhandled constraint type " + constraint.getConstraint().getSetField()); + } + } + + if (!config.getRequestedPorts().isEmpty()) { + configMapper.insertRequestedPorts(configInsert.getId(), config.getRequestedPorts()); + } + + if (!config.getTaskLinks().isEmpty()) { + configMapper.insertTaskLinks( + configInsert.getId(), + FluentIterable.from(config.getTaskLinks().entrySet()) + .transform(TO_LINK) + .toList()); + } + + if (!config.getMetadata().isEmpty()) { + configMapper.insertMetadata(configInsert.getId(), config.getMetadata()); + } + + // TODO(wfarner): It would be nice if this generalized to different Container types. + if (config.getContainer().isSetDocker()) { + configMapper.insertContainer(configInsert.getId(), config.getContainer().getDocker()); + } + + return configInsert.getId(); + } + + Map<String, String> getTaskLinks(long configId) { + ImmutableMap.Builder<String, String> links = ImmutableMap.builder(); + for (TaskLink link : configMapper.selectTaskLinks(configId)) { + links.put(link.getLabel(), link.getUrl()); + } + return links.build(); + } + + List<TaskConfigRow> getConfigs(IJobKey job) { + requireNonNull(job); + return configMapper.selectConfigsByJob(job); + } + + List<Long> getTaskConfigIds(Set<String> scheduledTaskIds) { + requireNonNull(scheduledTaskIds); + return configMapper.selectConfigsByTaskId(scheduledTaskIds); + } + + /** + * Performs reference counting on configurations. If there are no longer any references to + * these configuration rows, they will be deleted. + * TODO(wfarner): Should we rely on foreign key constraints and opportunistically delete? + * + * @param configRowIds Configurations to delete if no references are found. + */ + void maybeExpungeConfigs(Set<Long> configRowIds) { + if (configMapper.selectTasksByConfigId(configRowIds).isEmpty()) { + configMapper.delete(configRowIds); + + // TODO(wfarner): Need to try removal from other tables as well, e.g. job keys. + } + } + + private static final Function<Map.Entry<String, String>, TaskLink> TO_LINK = + new Function<Map.Entry<String, String>, TaskLink>() { + @Override + public TaskLink apply(Map.Entry<String, String> entry) { + return new TaskLink(entry.getKey(), entry.getValue()); + } + }; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigMapper.java new file mode 100644 index 0000000..7ee001f --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigMapper.java @@ -0,0 +1,167 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db; + +import java.util.List; +import java.util.Set; + +import org.apache.aurora.scheduler.storage.db.views.TaskConfigRow; +import org.apache.aurora.scheduler.storage.db.views.TaskLink; +import org.apache.aurora.scheduler.storage.entities.IConstraint; +import org.apache.aurora.scheduler.storage.entities.IDockerContainer; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.ILimitConstraint; +import org.apache.aurora.scheduler.storage.entities.IMetadata; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.entities.IValueConstraint; +import org.apache.ibatis.annotations.Param; + +/** + * MyBatis mapper for task config objects. + */ +interface TaskConfigMapper { + + /** + * Inserts fields from a task config into the {@code task_configs} table. + * + * @param config Configuration to insert. + * @param result Container for auto-generated ID of the inserted row. + */ + void insert( + @Param("config") ITaskConfig config, + @Param("result") InsertResult result); + + /** + * Gets all task config rows referenced by a job. + * + * @param job Job to look up. + * @return Task config row container. + */ + List<TaskConfigRow> selectConfigsByJob(IJobKey job); + + /** + * Looks up task config IDs by task IDs. + * + * @param taskIds Task IDs to look up. + * @return Task config row IDs. + */ + List<Long> selectConfigsByTaskId(@Param("taskIds") Set<String> taskIds); + + /** + * Looks up task config IDs by id. + * + * @param configIds Task config IDs. + * @return Task config row IDs. + */ + List<Long> selectTasksByConfigId(@Param("configIds") Set<Long> configIds); + + /** + * Inserts the constraint association within an {@link ITaskConfig}. + * + * @param configId Task config ID. + * @param constraint Constraint to insert. + * @param result Container for auto-generated ID of the inserted row. + */ + void insertConstraint( + @Param("configId") long configId, + @Param("constraint") IConstraint constraint, + @Param("result") InsertResult result); + + /** + * Inserts the limit constraint association within an {@link IConstraint}. + * + * @param constraintId Constraint ID. + * @param constraint Constraint to insert. + */ + void insertLimitConstraint( + @Param("constraintId") long constraintId, + @Param("constraint") ILimitConstraint constraint); + + /** + * Inserts the value constraint association within an {@link IConstraint}. + * + * @param constraintId Constraint ID. + * @param constraint Constraint to insert. + * @param result Container for auto-generated ID of the inserted row. + */ + void insertValueConstraint( + @Param("constraintId") long constraintId, + @Param("constraint") IValueConstraint constraint, + @Param("result") InsertResult result); + + /** + * Inserts the values association within an {@link IValueConstraint}. + * + * @param valueConstraintId Value constraint ID. + * @param values Values to insert. + */ + void insertValueConstraintValues( + @Param("valueConstraintId") long valueConstraintId, + @Param("values") Set<String> values); + + /** + * Inserts the requested ports association within an {@link ITaskConfig}. + * + * @param configId Task config ID. + * @param ports Port names to insert. + */ + void insertRequestedPorts( + @Param("configId") long configId, + @Param("ports") Set<String> ports); + + /** + * Inserts the task links association within an {@link ITaskConfig}. + * + * @param configId Task config ID. + * @param links Task links to insert. + */ + void insertTaskLinks( + @Param("configId") long configId, + @Param("links") List<TaskLink> links); + + /** + * Selects the task links associated with a {@link ITaskConfig}. + * + * @param configId Task config ID. + * @return Links associated with the task config. + */ + List<TaskLink> selectTaskLinks(@Param("configId") long configId); + + /** + * Inserts the container association within an {@link ITaskConfig}. + * + * @param configId Task config ID. + * @param container Container to insert. + */ + void insertContainer( + @Param("configId") long configId, + @Param("container") IDockerContainer container); + + /** + * Inserts the metadata association within an {@link ITaskConfig}. + * + * @param configId Task config ID. + * @param metadata Metadata associated with the task config. + */ + void insertMetadata( + @Param("configId") long configId, + @Param("metadata") Set<IMetadata> metadata); + + /** + * Deletes task configs. + * + * @param configIds Configs to delete. + */ + void delete(@Param("configIds") Set<Long> configIds); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java new file mode 100644 index 0000000..9903675 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskMapper.java @@ -0,0 +1,85 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db; + +import java.util.List; +import java.util.Set; + +import org.apache.aurora.gen.TaskQuery; +import org.apache.aurora.scheduler.storage.db.views.AssignedPort; +import org.apache.aurora.scheduler.storage.db.views.ScheduledTaskWrapper; +import org.apache.aurora.scheduler.storage.entities.ITaskEvent; +import org.apache.ibatis.annotations.Param; + +/** + * MyBatis mapper for scheduled tasks. + */ +interface TaskMapper { + + /** + * Inserts a scheduled task. + * + * @param task Task to insert. + */ + void insertScheduledTask(ScheduledTaskWrapper task); + + /** + * Gets tasks based on a query. + * + * @param query Query to use as a filter for tasks. + * @return Tasks matching the query. + */ + List<ScheduledTaskWrapper> select(TaskQuery query); + + /** + * Inserts the task events association within an + * {@link org.apache.aurora.scheduler.storage.entities.IScheduledTask}. + * + * @param taskRowId Task row ID. + * @param events Events to insert. + */ + void insertTaskEvents( + @Param("taskRowId") long taskRowId, + @Param("events") List<ITaskEvent> events); + + /** + * Inserts the assigned ports association within an + * {@link org.apache.aurora.scheduler.storage.entities.IScheduledTask}. + * + * @param taskRowId Task row ID. + * @param ports Assigned ports to insert. + */ + void insertPorts(@Param("taskRowId") long taskRowId, @Param("ports") List<AssignedPort> ports); + + /** + * Selects the assigned ports association within an + * {@link org.apache.aurora.scheduler.storage.entities.IScheduledTask}. + * + * @param taskRowId Task row ID. + * @return Ports associated with the task. + */ + List<AssignedPort> selectPorts(@Param("taskRowId") long taskRowId); + + /** + * Deletes all task rows. + */ + void truncate(); + + /** + * Deletes task rows by ID. + * + * @param taskIds IDs of tasks to delete. + */ + void deleteTasks(@Param("taskIds") Set<String> taskIds); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/shims/ContainerShim.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/shims/ContainerShim.java b/src/main/java/org/apache/aurora/scheduler/storage/db/shims/ContainerShim.java new file mode 100644 index 0000000..07a991d --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/shims/ContainerShim.java @@ -0,0 +1,42 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db.shims; + +import org.apache.aurora.gen.Container; +import org.apache.aurora.gen.DockerContainer; +import org.apache.aurora.gen.MesosContainer; + +/** + * An extension of {@link Container} that does not throw {@link NullPointerException} when + * accessors are called on unset fields. + */ +public class ContainerShim extends Container { + @Override + public DockerContainer getDocker() { + if (isSet(_Fields.DOCKER)) { + return super.getDocker(); + } else { + return null; + } + } + + @Override + public MesosContainer getMesos() { + if (isSet(_Fields.MESOS)) { + return super.getMesos(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/shims/TaskConstraintShim.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/shims/TaskConstraintShim.java b/src/main/java/org/apache/aurora/scheduler/storage/db/shims/TaskConstraintShim.java new file mode 100644 index 0000000..4990af7 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/shims/TaskConstraintShim.java @@ -0,0 +1,42 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db.shims; + +import org.apache.aurora.gen.LimitConstraint; +import org.apache.aurora.gen.TaskConstraint; +import org.apache.aurora.gen.ValueConstraint; + +/** + * An extension of {@link TaskConstraint} that does not throw {@link NullPointerException} when + * accessors are called on unset fields. + */ +public class TaskConstraintShim extends TaskConstraint { + @Override + public ValueConstraint getValue() { + if (isSet(_Fields.VALUE)) { + return super.getValue(); + } else { + return null; + } + } + + @Override + public LimitConstraint getLimit() { + if (isSet(_Fields.LIMIT)) { + return super.getLimit(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/ScheduleStatusTypeHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/ScheduleStatusTypeHandler.java b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/ScheduleStatusTypeHandler.java new file mode 100644 index 0000000..1f203b4 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/ScheduleStatusTypeHandler.java @@ -0,0 +1,26 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db.typehandlers; + +import org.apache.aurora.gen.ScheduleStatus; + +/** + * Type handler for {@link ScheduleStatus}. + */ +class ScheduleStatusTypeHandler extends AbstractTEnumTypeHandler<ScheduleStatus> { + @Override + protected ScheduleStatus fromValue(int value) { + return ScheduleStatus.findByValue(value); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java index 4d0c10d..0a519be 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java @@ -32,6 +32,7 @@ public final class TypeHandlers { JobUpdateActionTypeHandler.class, JobUpdateStatusTypeHandler.class, MaintenanceModeTypeHandler.class, + ScheduleStatusTypeHandler.class, TaskConfigTypeHandler.class); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/views/AssignedPort.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/views/AssignedPort.java b/src/main/java/org/apache/aurora/scheduler/storage/db/views/AssignedPort.java new file mode 100644 index 0000000..0c6442c --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/views/AssignedPort.java @@ -0,0 +1,40 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db.views; + +/** + * Representation of a row in the task_ports table. + */ +public class AssignedPort { + private final String name; + private final int port; + + private AssignedPort() { + // Needed by mybatis. + this(null, -1); + } + + public AssignedPort(String name, int port) { + this.name = name; + this.port = port; + } + + public String getName() { + return name; + } + + public int getPort() { + return port; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java new file mode 100644 index 0000000..b89e7b5 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java @@ -0,0 +1,48 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db.views; + +import org.apache.aurora.gen.ScheduledTask; + +/** + * Representation of a row in the tasks table. + */ +public class ScheduledTaskWrapper { + private final long taskRowId; + private final long taskConfigRowId; + private final ScheduledTask task; + + private ScheduledTaskWrapper() { + // Needed by mybatis. + this(-1, -1, null); + } + + public ScheduledTaskWrapper(long taskRowId, long taskConfigRowId, ScheduledTask task) { + this.taskRowId = taskRowId; + this.taskConfigRowId = taskConfigRowId; + this.task = task; + } + + public long getTaskRowId() { + return taskRowId; + } + + public long getTaskConfigRowId() { + return taskConfigRowId; + } + + public ScheduledTask getTask() { + return task; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskConfigRow.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskConfigRow.java b/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskConfigRow.java new file mode 100644 index 0000000..0160ae3 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskConfigRow.java @@ -0,0 +1,60 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db.views; + +import java.util.Objects; + +import org.apache.aurora.gen.TaskConfig; + +/** + * Representation of a row in the task_configs table. + */ +public class TaskConfigRow { + private final long id; + private final TaskConfig config; + + private TaskConfigRow() { + // Required for mybatis. + this(-1, null); + } + + public TaskConfigRow(long id, TaskConfig config) { + this.id = id; + this.config = config; + } + + public long getId() { + return id; + } + + public TaskConfig getConfig() { + return config; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TaskConfigRow)) { + return false; + } + + TaskConfigRow other = (TaskConfigRow) obj; + return Objects.equals(id, other.id) + && Objects.equals(config, other.config); + } + + @Override + public int hashCode() { + return Objects.hash(id, config); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskLink.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskLink.java b/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskLink.java new file mode 100644 index 0000000..52b09a9 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/views/TaskLink.java @@ -0,0 +1,40 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db.views; + +/** + * Representation of a row in the task_config_task_links table. + */ +public class TaskLink { + private final String label; + private final String url; + + private TaskLink() { + // Needed by mybatis. + this(null, null); + } + + public TaskLink(String label, String url) { + this.label = label; + this.url = url; + } + + public String getLabel() { + return label; + } + + public String getUrl() { + return url; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java index 21f7d4d..35c83b9 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java @@ -17,6 +17,7 @@ import javax.inject.Singleton; import com.google.inject.AbstractModule; import com.google.inject.Key; +import com.google.inject.PrivateModule; import com.twitter.common.inject.Bindings.KeyFactory; import org.apache.aurora.scheduler.storage.CronJobStore; @@ -47,6 +48,30 @@ public final class InMemStoresModule extends AbstractModule { @Override protected void configure() { bindStore(CronJobStore.Mutable.class, MemJobStore.class); - bindStore(TaskStore.Mutable.class, MemTaskStore.class); + } + + /** + * Binding module that installs the map-based task store implementation. + */ + public static class TaskStoreModule extends PrivateModule { + private final KeyFactory keyFactory; + + public TaskStoreModule(KeyFactory keyFactory) { + this.keyFactory = requireNonNull(keyFactory); + } + + private <T> void bindStore(Class<T> binding, Class<? extends T> impl) { + bind(binding).to(impl); + bind(impl).in(Singleton.class); + Key<T> key = keyFactory.create(binding); + bind(key).to(impl); + expose(key); + } + + @Override + protected void configure() { + bindStore(TaskStore.Mutable.class, MemTaskStore.class); + expose(TaskStore.Mutable.class); + } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/bf7f9b7f/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml index f76f9a9..cf31cf8 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml @@ -299,13 +299,11 @@ AND j.name = #{jobKey.name} AND j.environment = #{jobKey.environment} </if> - <if test="updateStatuses != null"> - <if test="updateStatuses.size() > 0"> - AND (max_status.status IN - <foreach item="element" collection="updateStatuses" open="(" separator="," close="))"> - #{element, typeHandler=org.apache.aurora.scheduler.storage.db.typehandlers.JobUpdateStatusTypeHandler} - </foreach> - </if> + <if test="updateStatuses != null and !updateStatuses.isEmpty()"> + AND (max_status.status IN + <foreach item="element" collection="updateStatuses" open="(" separator="," close="))"> + #{element, typeHandler=org.apache.aurora.scheduler.storage.db.typehandlers.JobUpdateStatusTypeHandler} + </foreach> </if> </if> ORDER BY max_ts.timestamp_ms DESC
