Repository: aurora Updated Branches: refs/heads/master 9a613deae -> 88f887587
Add a DbCronJobStore implementation. Bugs closed: AURORA-415 Reviewed at https://reviews.apache.org/r/35901/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/88f88758 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/88f88758 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/88f88758 Branch: refs/heads/master Commit: 88f88758742d20d80341bdde3402f610f3d1c549 Parents: 9a613de Author: Bill Farner <[email protected]> Authored: Wed Jul 1 10:40:44 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Wed Jul 1 10:44:56 2015 -0700 ---------------------------------------------------------------------- .../scheduler/storage/db/CronJobMapper.java | 40 +++++ .../scheduler/storage/db/DbCronJobStore.java | 94 +++++++++++ .../aurora/scheduler/storage/db/DbModule.java | 14 +- .../aurora/scheduler/storage/db/DbStorage.java | 5 + .../scheduler/storage/db/DbTaskStore.java | 70 +------- .../scheduler/storage/db/InsertResult.java | 4 +- .../scheduler/storage/db/TaskConfigManager.java | 65 ++++++++ .../CronCollisionPolicyTypeHandler.java | 26 +++ .../storage/db/typehandlers/TypeHandlers.java | 14 +- .../storage/db/views/CronJobWrapper.java | 42 +++++ .../storage/db/views/ScheduledTaskWrapper.java | 13 +- .../storage/mem/InMemStoresModule.java | 35 +--- .../scheduler/storage/mem/MemCronJobStore.java | 58 +++++++ .../scheduler/storage/mem/MemJobStore.java | 58 ------- .../scheduler/storage/db/CronJobMapper.xml | 114 +++++++++++++ .../scheduler/storage/db/JobKeyMapper.xml | 6 - .../aurora/scheduler/storage/db/TaskMapper.xml | 4 +- .../aurora/scheduler/storage/db/schema.sql | 21 ++- .../cron/quartz/AuroraCronJobTest.java | 15 -- .../aurora/scheduler/cron/quartz/CronIT.java | 25 +-- .../scheduler/cron/quartz/QuartzTestUtil.java | 3 + .../storage/AbstractCronJobStoreTest.java | 164 +++++++++++++++++++ .../scheduler/storage/StorageBackfillTest.java | 39 ----- .../storage/db/DbCronJobStoreTest.java | 39 +++++ .../storage/db/RowGarbageCollectorTest.java | 2 +- .../storage/mem/InMemTaskStoreTest.java | 2 +- .../storage/mem/MemCronJobStoreTest.java | 97 +++-------- 27 files changed, 745 insertions(+), 324 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java new file mode 100644 index 0000000..a272785 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java @@ -0,0 +1,40 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db; + +import java.util.List; + +import javax.annotation.Nullable; + +import org.apache.aurora.scheduler.storage.db.views.CronJobWrapper; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.ibatis.annotations.Param; + +/** + * MyBatis mapper for cron jobs. + */ +interface CronJobMapper { + + void insert(@Param("job") IJobConfiguration job, @Param("task_config_id") long taskConfigId); + + void delete(@Param("job") IJobKey job); + + void truncate(); + + List<CronJobWrapper> selectAll(); + + @Nullable + CronJobWrapper select(@Param("job") IJobKey job); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java new file mode 100644 index 0000000..c2696a9 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java @@ -0,0 +1,94 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db; + +import javax.inject.Inject; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.FluentIterable; + +import org.apache.aurora.gen.JobConfiguration; +import org.apache.aurora.scheduler.storage.CronJobStore; +import org.apache.aurora.scheduler.storage.db.views.CronJobWrapper; +import org.apache.aurora.scheduler.storage.db.views.TaskConfigRow; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IJobKey; + +import static java.util.Objects.requireNonNull; + +/** + * Cron job store backed by a relational database. + */ +class DbCronJobStore implements CronJobStore.Mutable { + private final CronJobMapper cronJobMapper; + private final JobKeyMapper jobKeyMapper; + private final TaskConfigManager taskConfigManager; + + @Inject + DbCronJobStore( + CronJobMapper cronJobMapper, + JobKeyMapper jobKeyMapper, + TaskConfigManager taskConfigManager) { + + this.cronJobMapper = requireNonNull(cronJobMapper); + this.jobKeyMapper = requireNonNull(jobKeyMapper); + this.taskConfigManager = requireNonNull(taskConfigManager); + } + + @Override + public void saveAcceptedJob(IJobConfiguration jobConfig) { + requireNonNull(jobConfig); + jobKeyMapper.merge(jobConfig.getKey()); + cronJobMapper.insert(jobConfig, taskConfigManager.insert(jobConfig.getTaskConfig())); + } + + @Override + public void removeJob(IJobKey jobKey) { + requireNonNull(jobKey); + cronJobMapper.delete(jobKey); + } + + @Override + public void deleteJobs() { + cronJobMapper.truncate(); + } + + private final Function<CronJobWrapper, JobConfiguration> hydrateJob = + new Function<CronJobWrapper, JobConfiguration>() { + @Override + public JobConfiguration apply(CronJobWrapper row) { + JobConfiguration job = row.getJob(); + job.setTaskConfig(taskConfigManager.getConfigSaturator().apply( + new TaskConfigRow(row.getTaskConfigRowId(), job.getTaskConfig()))); + return job; + } + }; + + @Override + public Iterable<IJobConfiguration> fetchJobs() { + return FluentIterable.from(cronJobMapper.selectAll()) + .transform(hydrateJob) + .transform(IJobConfiguration.FROM_BUILDER) + .toList(); + } + + @Override + public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) { + requireNonNull(jobKey); + return Optional.fromNullable(cronJobMapper.select(jobKey)) + .transform(hydrateJob) + .transform(IJobConfiguration.FROM_BUILDER); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java index 8bfb076..23bf0ac 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java @@ -77,6 +77,7 @@ public final class DbModule extends PrivateModule { private static final Set<Class<?>> MAPPER_CLASSES = ImmutableSet.<Class<?>>builder() .add(AttributeMapper.class) + .add(CronJobMapper.class) .add(EnumValueMapper.class) .add(FrameworkIdMapper.class) .add(JobInstanceUpdateEventMapper.class) @@ -90,12 +91,12 @@ public final class DbModule extends PrivateModule { .build(); private final KeyFactory keyFactory; - private final Module taskStoreModule; + private final Module taskStoresModule; private final String jdbcSchema; - private DbModule(KeyFactory keyFactory, Module taskStoreModule, String jdbcSchema) { + private DbModule(KeyFactory keyFactory, Module taskStoresModule, String jdbcSchema) { this.keyFactory = requireNonNull(keyFactory); - this.taskStoreModule = requireNonNull(taskStoreModule); + this.taskStoresModule = requireNonNull(taskStoresModule); // We always disable the MvStore, as it is in beta as of this writing. this.jdbcSchema = jdbcSchema + ";MV_STORE=false"; } @@ -144,7 +145,7 @@ public final class DbModule extends PrivateModule { private static Module getTaskStoreModule(KeyFactory keyFactory) { return USE_DB_TASK_STORE.get() ? new TaskStoreModule(keyFactory) - : new InMemStoresModule.TaskStoreModule(keyFactory); + : new InMemStoresModule(keyFactory); } private <T> void bindStore(Class<T> binding, Class<? extends T> impl) { @@ -190,8 +191,7 @@ public final class DbModule extends PrivateModule { expose(JobKeyMapper.class); } }); - install(new InMemStoresModule(keyFactory)); - install(taskStoreModule); + install(taskStoresModule); expose(keyFactory.create(CronJobStore.Mutable.class)); expose(keyFactory.create(TaskStore.Mutable.class)); @@ -237,6 +237,8 @@ public final class DbModule extends PrivateModule { protected void configure() { bindStore(TaskStore.Mutable.class, DbTaskStore.class); expose(TaskStore.Mutable.class); + bindStore(CronJobStore.Mutable.class, DbCronJobStore.class); + expose(DbCronJobStore.Mutable.class); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java index bb61542..a1f0d3c 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.Inject; import com.twitter.common.inject.TimedInterceptor.Timed; +import org.apache.aurora.gen.CronCollisionPolicy; import org.apache.aurora.gen.JobUpdateAction; import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.MaintenanceMode; @@ -209,6 +210,10 @@ class DbStorage extends AbstractIdleService implements Storage { session.update(createStatementName); } + for (CronCollisionPolicy policy : CronCollisionPolicy.values()) { + enumValueMapper.addEnumValue("cron_policies", policy.getValue(), policy.name()); + } + for (MaintenanceMode mode : MaintenanceMode.values()) { enumValueMapper.addEnumValue("maintenance_modes", mode.getValue(), mode.name()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java index a5acb3a..f3f241d 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbTaskStore.java @@ -40,11 +40,8 @@ import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.util.Clock; -import org.apache.aurora.gen.Constraint; -import org.apache.aurora.gen.Container; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Query.Builder; import org.apache.aurora.scheduler.base.Tasks; @@ -142,7 +139,8 @@ class DbTaskStore implements TaskStore.Mutable { Map<ITaskConfig, TaskConfigRow> rowsToIds = FluentIterable.from(jobs) .transformAndConcat(getRows) - .uniqueIndex(Functions.compose(ITaskConfig.FROM_BUILDER, getConfigSaturator())); + .uniqueIndex( + Functions.compose(ITaskConfig.FROM_BUILDER, configManager.getConfigSaturator())); return Maps.transformValues(rowsToIds, CONFIG_ID); } @@ -177,17 +175,14 @@ class DbTaskStore implements TaskStore.Mutable { for (IScheduledTask task : tasks) { long configId = configCache.getUnchecked(task.getAssignedTask().getTask()); - ScheduledTaskWrapper wrappedTask = new ScheduledTaskWrapper(-1, configId, task.newBuilder()); + ScheduledTaskWrapper wrappedTask = new ScheduledTaskWrapper(configId, task.newBuilder()); taskMapper.insertScheduledTask(wrappedTask); - Preconditions.checkState( - wrappedTask.getTaskRowId() != -1, - "Row ID should have been populated during insert."); if (!task.getTaskEvents().isEmpty()) { - taskMapper.insertTaskEvents(wrappedTask.getTaskRowId(), task.getTaskEvents()); + taskMapper.insertTaskEvents(wrappedTask.getId(), task.getTaskEvents()); } if (!task.getAssignedTask().getAssignedPorts().isEmpty()) { taskMapper.insertPorts( - wrappedTask.getTaskRowId(), + wrappedTask.getId(), toAssignedPorts(task.getAssignedTask().getAssignedPorts())); } } @@ -261,26 +256,6 @@ class DbTaskStore implements TaskStore.Mutable { return false; } - private Function<TaskConfigRow, TaskConfig> getConfigSaturator() { - // It appears that there is no way in mybatis to populate a field of type Map. To work around - // this, we need to manually perform the query and associate the elements. - final LoadingCache<Long, Map<String, String>> taskLinkCache = CacheBuilder.newBuilder() - .build(new CacheLoader<Long, Map<String, String>>() { - @Override - public Map<String, String> load(Long configId) { - return configManager.getTaskLinks(configId); - } - }); - Function<TaskConfigRow, TaskConfig> linkPopulator = new Function<TaskConfigRow, TaskConfig>() { - @Override - public TaskConfig apply(TaskConfigRow row) { - return row.getConfig().setTaskLinks(taskLinkCache.getUnchecked(row.getId())); - } - }; - - return Functions.compose(REPLACE_UNION_TYPES, linkPopulator); - } - private FluentIterable<IScheduledTask> matches(Query.Builder query) { Iterable<ScheduledTaskWrapper> results; Predicate<IScheduledTask> filter; @@ -297,7 +272,7 @@ class DbTaskStore implements TaskStore.Mutable { filter = Predicates.alwaysTrue(); } - final Function<TaskConfigRow, TaskConfig> configSaturator = getConfigSaturator(); + final Function<TaskConfigRow, TaskConfig> configSaturator = configManager.getConfigSaturator(); return FluentIterable.from(results) .transform(populateAssignedPorts) .transform(new Function<ScheduledTaskWrapper, ScheduledTaskWrapper>() { @@ -320,7 +295,7 @@ class DbTaskStore implements TaskStore.Mutable { @Override public ScheduledTaskWrapper apply(ScheduledTaskWrapper task) { ImmutableMap.Builder<String, Integer> ports = ImmutableMap.builder(); - for (AssignedPort port : taskMapper.selectPorts(task.getTaskRowId())) { + for (AssignedPort port : taskMapper.selectPorts(task.getId())) { ports.put(port.getName(), port.getPort()); } task.getTask().getAssignedTask().setAssignedPorts(ports.build()); @@ -335,35 +310,4 @@ class DbTaskStore implements TaskStore.Mutable { return task.getTask(); } }; - - /** - * Replaces the shimmed {@link org.apache.thrift.TUnion} instances with the base thrift types. - * This is necessary because TUnion, as of thrift 0.9.1, restricts subclassing. The copy - * constructor checks for equality on {@link Object#getClass()} rather than the subclass-friendly - * {@link Class#isInstance(Object). - */ - private static final Function<TaskConfig, TaskConfig> REPLACE_UNION_TYPES = - new Function<TaskConfig, TaskConfig>() { - @Override - public TaskConfig apply(TaskConfig config) { - ImmutableSet.Builder<Constraint> constraints = ImmutableSet.builder(); - for (Constraint constraint : config.getConstraints()) { - Constraint replacement = new Constraint() - .setName(constraint.getName()); - replacement.setConstraint( - new TaskConstraint( - constraint.getConstraint().getSetField(), - constraint.getConstraint().getFieldValue())); - constraints.add(replacement); - } - config.setConstraints(constraints.build()); - - config.setContainer( - new Container( - config.getContainer().getSetField(), - config.getContainer().getFieldValue())); - - return config; - } - }; } http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/InsertResult.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/InsertResult.java b/src/main/java/org/apache/aurora/scheduler/storage/db/InsertResult.java index 44dc8f5..8a81579 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/InsertResult.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/InsertResult.java @@ -18,11 +18,11 @@ package org.apache.aurora.scheduler.storage.db; * This class can be used as an additional {@link org.apache.ibatis.annotations.Param Param} to * retrieve the ID when the inserted object is not self-identifying. */ -class InsertResult { +public class InsertResult { private long id = Long.MIN_VALUE; private boolean isSet; - long getId() { + public long getId() { if (!isSet) { throw new IllegalStateException("Missing ID value."); } http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java index 46fa940..39177fd 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/TaskConfigManager.java @@ -20,9 +20,18 @@ import java.util.Set; import javax.inject.Inject; import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.aurora.gen.Constraint; +import org.apache.aurora.gen.Container; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.scheduler.storage.db.views.TaskConfigRow; import org.apache.aurora.scheduler.storage.db.views.TaskLink; import org.apache.aurora.scheduler.storage.entities.IConstraint; @@ -42,6 +51,62 @@ class TaskConfigManager { this.jobKeyMapper = requireNonNull(jobKeyMapper); } + /** + * Replaces the shimmed {@link org.apache.thrift.TUnion} instances with the base thrift types. + * This is necessary because TUnion, as of thrift 0.9.1, restricts subclassing. The copy + * constructor checks for equality on {@link Object#getClass()} rather than the subclass-friendly + * {@link Class#isInstance(Object). + */ + private static final Function<TaskConfig, TaskConfig> REPLACE_UNION_TYPES = + new Function<TaskConfig, TaskConfig>() { + @Override + public TaskConfig apply(TaskConfig config) { + ImmutableSet.Builder<Constraint> constraints = ImmutableSet.builder(); + for (Constraint constraint : config.getConstraints()) { + Constraint replacement = new Constraint() + .setName(constraint.getName()); + replacement.setConstraint( + new TaskConstraint( + constraint.getConstraint().getSetField(), + constraint.getConstraint().getFieldValue())); + constraints.add(replacement); + } + config.setConstraints(constraints.build()); + + config.setContainer( + new Container( + config.getContainer().getSetField(), + config.getContainer().getFieldValue())); + + return config; + } + }; + + /** + * Creates an instance of a caching function that will fill all relations in a task config. + * + * @return A function to populate relations in task configs. + */ + Function<TaskConfigRow, TaskConfig> getConfigSaturator() { + // It appears that there is no way in mybatis to populate a field of type Map. To work around + // this, we need to manually perform the query and associate the elements. + final LoadingCache<Long, Map<String, String>> taskLinkCache = CacheBuilder.newBuilder() + .build(new CacheLoader<Long, Map<String, String>>() { + @Override + public Map<String, String> load(Long configId) { + return getTaskLinks(configId); + } + }); + Function<TaskConfigRow, TaskConfig> linkPopulator = new Function<TaskConfigRow, TaskConfig>() { + @Override + public TaskConfig apply(TaskConfigRow row) { + return row.getConfig().setTaskLinks(taskLinkCache.getUnchecked(row.getId())); + } + }; + + return Functions.compose(REPLACE_UNION_TYPES, linkPopulator); + } + long insert(ITaskConfig config) { InsertResult configInsert = new InsertResult(); jobKeyMapper.merge(config.getJob()); http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/CronCollisionPolicyTypeHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/CronCollisionPolicyTypeHandler.java b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/CronCollisionPolicyTypeHandler.java new file mode 100644 index 0000000..b87a29f --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/CronCollisionPolicyTypeHandler.java @@ -0,0 +1,26 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db.typehandlers; + +import org.apache.aurora.gen.CronCollisionPolicy; + +/** + * Type handler for {@link CronCollisionPolicy}. + */ +public class CronCollisionPolicyTypeHandler extends AbstractTEnumTypeHandler<CronCollisionPolicy> { + @Override + protected CronCollisionPolicy fromValue(int value) { + return CronCollisionPolicy.findByValue(value); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java index 0a519be..9afc3f3 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/typehandlers/TypeHandlers.java @@ -28,11 +28,13 @@ public final class TypeHandlers { } public static List<Class<? extends TypeHandler<?>>> getAll() { - return ImmutableList.<Class<? extends TypeHandler<?>>>of( - JobUpdateActionTypeHandler.class, - JobUpdateStatusTypeHandler.class, - MaintenanceModeTypeHandler.class, - ScheduleStatusTypeHandler.class, - TaskConfigTypeHandler.class); + return ImmutableList.<Class<? extends TypeHandler<?>>>builder() + .add(CronCollisionPolicyTypeHandler.class) + .add(JobUpdateActionTypeHandler.class) + .add(JobUpdateStatusTypeHandler.class) + .add(MaintenanceModeTypeHandler.class) + .add(ScheduleStatusTypeHandler.class) + .add(TaskConfigTypeHandler.class) + .build(); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/views/CronJobWrapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/views/CronJobWrapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/views/CronJobWrapper.java new file mode 100644 index 0000000..5202db5 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/views/CronJobWrapper.java @@ -0,0 +1,42 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db.views; + +import org.apache.aurora.gen.JobConfiguration; + +/** + * Representation of a row in the cron_jobs table. + */ +public class CronJobWrapper { + private final long taskConfigRowId; + private final JobConfiguration job; + + private CronJobWrapper() { + // Needed by mybatis. + this(-1, null); + } + + public CronJobWrapper(long taskConfigRowId, JobConfiguration job) { + this.taskConfigRowId = taskConfigRowId; + this.job = job; + } + + public long getTaskConfigRowId() { + return taskConfigRowId; + } + + public JobConfiguration getJob() { + return job; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java index b89e7b5..3e0a2f7 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/views/ScheduledTaskWrapper.java @@ -14,30 +14,25 @@ package org.apache.aurora.scheduler.storage.db.views; import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.storage.db.InsertResult; /** * Representation of a row in the tasks table. */ -public class ScheduledTaskWrapper { - private final long taskRowId; +public class ScheduledTaskWrapper extends InsertResult { private final long taskConfigRowId; private final ScheduledTask task; private ScheduledTaskWrapper() { // Needed by mybatis. - this(-1, -1, null); + this(-1, null); } - public ScheduledTaskWrapper(long taskRowId, long taskConfigRowId, ScheduledTask task) { - this.taskRowId = taskRowId; + public ScheduledTaskWrapper(long taskConfigRowId, ScheduledTask task) { this.taskConfigRowId = taskConfigRowId; this.task = task; } - public long getTaskRowId() { - return taskRowId; - } - public long getTaskConfigRowId() { return taskConfigRowId; } http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java index 35c83b9..c30f51b 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/InMemStoresModule.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.storage.mem; import javax.inject.Singleton; -import com.google.inject.AbstractModule; import com.google.inject.Key; import com.google.inject.PrivateModule; import com.twitter.common.inject.Bindings.KeyFactory; @@ -30,8 +29,7 @@ import static java.util.Objects.requireNonNull; * <p> * NOTE: These stores are being phased out in favor of database-backed stores. */ -public final class InMemStoresModule extends AbstractModule { - +public final class InMemStoresModule extends PrivateModule { private final KeyFactory keyFactory; public InMemStoresModule(KeyFactory keyFactory) { @@ -43,35 +41,14 @@ public final class InMemStoresModule extends AbstractModule { bind(impl).in(Singleton.class); Key<T> key = keyFactory.create(binding); bind(key).to(impl); + expose(key); } @Override protected void configure() { - bindStore(CronJobStore.Mutable.class, MemJobStore.class); - } - - /** - * Binding module that installs the map-based task store implementation. - */ - public static class TaskStoreModule extends PrivateModule { - private final KeyFactory keyFactory; - - public TaskStoreModule(KeyFactory keyFactory) { - this.keyFactory = requireNonNull(keyFactory); - } - - private <T> void bindStore(Class<T> binding, Class<? extends T> impl) { - bind(binding).to(impl); - bind(impl).in(Singleton.class); - Key<T> key = keyFactory.create(binding); - bind(key).to(impl); - expose(key); - } - - @Override - protected void configure() { - bindStore(TaskStore.Mutable.class, MemTaskStore.class); - expose(TaskStore.Mutable.class); - } + bindStore(TaskStore.Mutable.class, MemTaskStore.class); + expose(TaskStore.Mutable.class); + bindStore(CronJobStore.Mutable.class, MemCronJobStore.class); + expose(CronJobStore.Mutable.class); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/mem/MemCronJobStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemCronJobStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemCronJobStore.java new file mode 100644 index 0000000..365494c --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemCronJobStore.java @@ -0,0 +1,58 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.mem; + +import java.util.Map; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.storage.CronJobStore; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IJobKey; + +/** + * An in-memory cron job store. + */ +class MemCronJobStore implements CronJobStore.Mutable { + private final Map<IJobKey, IJobConfiguration> jobs = Maps.newConcurrentMap(); + + @Override + public void saveAcceptedJob(IJobConfiguration jobConfig) { + IJobKey key = JobKeys.assertValid(jobConfig.getKey()); + jobs.put(key, jobConfig); + } + + @Override + public void removeJob(IJobKey jobKey) { + jobs.remove(jobKey); + } + + @Override + public void deleteJobs() { + jobs.clear(); + } + + @Override + public Iterable<IJobConfiguration> fetchJobs() { + return ImmutableSet.copyOf(jobs.values()); + } + + @Override + public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) { + return Optional.fromNullable(jobs.get(jobKey)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobStore.java deleted file mode 100644 index f9e9e89..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobStore.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.mem; - -import java.util.Map; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; - -import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.storage.CronJobStore; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobKey; - -/** - * An in-memory job store. - */ -class MemJobStore implements CronJobStore.Mutable { - private final Map<IJobKey, IJobConfiguration> jobs = Maps.newConcurrentMap(); - - @Override - public void saveAcceptedJob(IJobConfiguration jobConfig) { - IJobKey key = JobKeys.assertValid(jobConfig.getKey()); - jobs.put(key, jobConfig); - } - - @Override - public void removeJob(IJobKey jobKey) { - jobs.remove(jobKey); - } - - @Override - public void deleteJobs() { - jobs.clear(); - } - - @Override - public Iterable<IJobConfiguration> fetchJobs() { - return ImmutableSet.copyOf(jobs.values()); - } - - @Override - public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) { - return Optional.fromNullable(jobs.get(jobKey)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/resources/org/apache/aurora/scheduler/storage/db/CronJobMapper.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/CronJobMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/CronJobMapper.xml new file mode 100644 index 0000000..936c2b8 --- /dev/null +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/CronJobMapper.xml @@ -0,0 +1,114 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE mapper + PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" + "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> +<mapper namespace="org.apache.aurora.scheduler.storage.db.CronJobMapper"> + + <insert id="insert"> + INSERT INTO cron_jobs( + job_key_id, + creator_user, + cron_schedule, + <if test="job.cronCollisionPolicy != null"> + cron_collision_policy, + </if> + task_config_row_id, + instance_count + ) VALUES ( + ( + SELECT ID + FROM job_keys + WHERE role = #{job.key.role} + AND environment = #{job.key.environment} + AND name = #{job.key.name} + ), + #{job.owner.user}, + #{job.cronSchedule}, + <if test="job.cronCollisionPolicy != null"> + #{job.cronCollisionPolicy, typeHandler=org.apache.aurora.scheduler.storage.db.typehandlers.CronCollisionPolicyTypeHandler}, + </if> + #{task_config_id}, + #{job.instanceCount} + ) + </insert> + + <delete id="delete"> + DELETE FROM cron_jobs + WHERE job_key_id + IN (SELECT id + FROM job_keys + WHERE role = #{job.role} + AND environment = #{job.environment} + AND name = #{job.name}) + </delete> + + <delete id="truncate"> + DELETE FROM cron_jobs + </delete> + + <resultMap + id="cronJobWrapperResultMap" + type="org.apache.aurora.scheduler.storage.db.views.CronJobWrapper"> + + <id column="c_id" /> + <result property="taskConfigRowId" column="c_task_config_row_id"/> + <association property="job" javaType="org.apache.aurora.gen.JobConfiguration" columnPrefix="c_"> + <id column="id"/> + <result property="owner.role" column="j_role"/> + <result property="owner.user" column="creator_user"/> + <result + property="cronCollisionPolicy" + column="cron_collision_policy" + typeHandler="org.apache.aurora.scheduler.storage.db.typehandlers.CronCollisionPolicyTypeHandler"/> + <association + property="key" + resultMap="org.apache.aurora.scheduler.storage.db.JobKeyMapper.jobKeyMap" + columnPrefix="j_"/> + <association + property="taskConfig" + select="org.apache.aurora.scheduler.storage.db.TaskConfigMapper.selectConfig" + column="task_config_row_id" + foreignColumn="id"/> + </association> + </resultMap> + + <sql id="unscopedSelect"> + SELECT + c.id AS c_id, + c.creator_user AS c_creator_user, + c.cron_schedule AS c_cron_schedule, + c.cron_collision_policy AS c_cron_collision_policy, + c.task_config_row_id AS c_task_config_row_id, + c.instance_count AS c_instance_count, + j.role AS c_j_role, + j.environment AS c_j_environment, + j.name AS c_j_name + FROM cron_jobs AS c + INNER JOIN job_keys AS j ON j.id = c.job_key_id + </sql> + + <select id="selectAll" resultMap="cronJobWrapperResultMap"> + <include refid="unscopedSelect"/> + </select> + + <select id="select" resultMap="cronJobWrapperResultMap"> + <include refid="unscopedSelect"/> + WHERE j.role = #{job.role} + AND environment = #{job.environment} + AND name = #{job.name} + </select> +</mapper> http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/resources/org/apache/aurora/scheduler/storage/db/JobKeyMapper.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobKeyMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobKeyMapper.xml index 80ff347..3b5a7c9 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobKeyMapper.xml +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobKeyMapper.xml @@ -29,12 +29,6 @@ ) </insert> - <sql id="job_key_clause"> - role = #{role} - AND environment = #{environment} - AND name = #{name} - </sql> - <select id="selectAll" resultType="org.apache.aurora.gen.JobKey"> SELECT * FROM job_keys </select> http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/resources/org/apache/aurora/scheduler/storage/db/TaskMapper.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/TaskMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/TaskMapper.xml index e9660b7..0b9e3d7 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/TaskMapper.xml +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/TaskMapper.xml @@ -18,7 +18,7 @@ "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="org.apache.aurora.scheduler.storage.db.TaskMapper"> <cache size="10000" readOnly="true" /> - <insert id="insertScheduledTask" useGeneratedKeys="true" keyColumn="id" keyProperty="taskRowId"> + <insert id="insertScheduledTask" useGeneratedKeys="true" keyColumn="id" keyProperty="id"> INSERT INTO tasks ( task_id, slave_row_id, @@ -84,7 +84,7 @@ </resultMap> <resultMap id="taskWrapperMap" type="org.apache.aurora.scheduler.storage.db.views.ScheduledTaskWrapper"> - <id column="row_id" property="taskRowId"/> + <id column="row_id" property="id"/> <result column="c_id" property="taskConfigRowId"/> <association property="task" resultMap="scheduledTaskMap"/> </resultMap> http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql ---------------------------------------------------------------------- diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql index 0f77db7..687010e 100644 --- a/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql +++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/schema.sql @@ -292,4 +292,23 @@ CREATE TABLE task_ports( port INT NOT NULL, UNIQUE(task_row_id, name) -); \ No newline at end of file +); + +CREATE TABLE cron_policies( + id INT PRIMARY KEY, + name VARCHAR NOT NULL, + + UNIQUE(name) +); + +CREATE TABLE cron_jobs( + id IDENTITY, + job_key_id INT NOT NULL REFERENCES job_keys(id), + creator_user VARCHAR NOT NULL, + cron_schedule VARCHAR NOT NULL, + cron_collision_policy INT REFERENCES cron_policies(id), + task_config_row_id INT NOT NULL REFERENCES task_configs(id), + instance_count INT NOT NULL, + + UNIQUE(job_key_id) +); http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java index b9e1657..4280762 100644 --- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java +++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java @@ -29,7 +29,6 @@ import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.db.DbUtil; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.easymock.Capture; @@ -71,20 +70,6 @@ public class AuroraCronJobTest extends EasyMockTest { } @Test - public void testInvalidConfigIsNoop() throws JobExecutionException { - control.replay(); - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - protected void execute(MutableStoreProvider storeProvider) { - storeProvider.getCronJobStore().saveAcceptedJob( - IJobConfiguration.build(QuartzTestUtil.JOB.newBuilder().setCronSchedule(null))); - } - }); - - auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY); - } - - @Test public void testEmptyStorage() throws JobExecutionException { stateManager.insertPendingTasks( EasyMock.<MutableStoreProvider>anyObject(), http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java index 863e9c9..c599dbb 100644 --- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java +++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java @@ -23,11 +23,13 @@ import com.google.inject.util.Modules; import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.Clock; -import org.apache.aurora.gen.ExecutorConfig; +import org.apache.aurora.gen.Container; import org.apache.aurora.gen.Identity; import org.apache.aurora.gen.JobConfiguration; +import org.apache.aurora.gen.MesosContainer; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.cron.CronJobManager; import org.apache.aurora.scheduler.cron.CrontabEntry; import org.apache.aurora.scheduler.cron.SanitizedCronJob; @@ -63,17 +65,7 @@ public class CronIT extends EasyMockTest { .setKey(JOB_KEY.newBuilder()) .setInstanceCount(2) .setOwner(IDENTITY) - .setTaskConfig(new TaskConfig() - .setJobName(JOB_KEY.getName()) - .setEnvironment(JOB_KEY.getEnvironment()) - .setOwner(IDENTITY) - .setExecutorConfig(new ExecutorConfig() - .setName("cmd.exe") - .setData("echo hello world")) - .setNumCpus(7) - .setRamMb(8) - .setDiskMb(9)) - ); + .setTaskConfig(makeTaskConfig())); private Injector injector; private StateManager stateManager; @@ -104,6 +96,15 @@ public class CronIT extends EasyMockTest { }); } + private static TaskConfig makeTaskConfig() { + TaskConfig config = TaskTestUtil.makeConfig(JOB_KEY).newBuilder(); + config.setIsService(false); + // Bypassing a command-line argument in ConfigurationManager that by default disallows the + // docker container type. + config.setContainer(Container.mesos(new MesosContainer())); + return config; + } + private Service boot() { Service service = injector.getInstance(CronLifecycle.class); service.startAsync().awaitRunning(); http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java index 2d74b32..13cb73d 100644 --- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java +++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/QuartzTestUtil.java @@ -14,11 +14,13 @@ package org.apache.aurora.scheduler.cron.quartz; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; import org.apache.aurora.gen.CronCollisionPolicy; import org.apache.aurora.gen.ExecutorConfig; import org.apache.aurora.gen.Identity; import org.apache.aurora.gen.JobConfiguration; +import org.apache.aurora.gen.Metadata; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.configuration.ConfigurationManager; @@ -47,6 +49,7 @@ final class QuartzTestUtil { .setDiskMb(3) .setRamMb(4) .setNumCpus(5) + .setMetadata(ImmutableSet.<Metadata>of()) .setExecutorConfig(new ExecutorConfig() .setName("cmd.exe") .setData("echo hello world"))) http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java new file mode 100644 index 0000000..e20f66c --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractCronJobStoreTest.java @@ -0,0 +1,164 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage; + +import java.util.Set; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; + +import org.apache.aurora.gen.CronCollisionPolicy; +import org.apache.aurora.gen.Identity; +import org.apache.aurora.gen.JobConfiguration; +import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.Storage.Work; +import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.testing.StorageEntityUtil; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public abstract class AbstractCronJobStoreTest { + private static final IJobConfiguration JOB_A = makeJob("a"); + private static final IJobConfiguration JOB_B = makeJob("b"); + + private static final IJobKey KEY_A = JOB_A.getKey(); + private static final IJobKey KEY_B = JOB_B.getKey(); + + protected Injector injector; + protected Storage storage; + + protected abstract Module getStorageModule(); + + @Before + public void baseSetUp() { + injector = Guice.createInjector(getStorageModule()); + storage = injector.getInstance(Storage.class); + storage.prepare(); + } + + @Test + public void testJobStore() { + assertNull(fetchJob(JobKeys.from("nobody", "nowhere", "noname")).orNull()); + assertEquals(ImmutableSet.<IJobConfiguration>of(), fetchJobs()); + + saveAcceptedJob(JOB_A); + assertEquals(JOB_A, fetchJob(KEY_A).orNull()); + assertEquals(ImmutableSet.of(JOB_A), fetchJobs()); + + saveAcceptedJob(JOB_B); + assertEquals(JOB_B, fetchJob(KEY_B).orNull()); + assertEquals(ImmutableSet.of(JOB_A, JOB_B), fetchJobs()); + + removeJob(KEY_B); + assertEquals(ImmutableSet.of(JOB_A), fetchJobs()); + + deleteJobs(); + assertEquals(ImmutableSet.<IJobConfiguration>of(), fetchJobs()); + } + + @Test + public void testJobStoreSameEnvironment() { + IJobConfiguration templateConfig = makeJob("labrat"); + JobConfiguration prodBuilder = templateConfig.newBuilder(); + prodBuilder.getKey().setEnvironment("prod"); + IJobConfiguration prod = IJobConfiguration.build(prodBuilder); + JobConfiguration stagingBuilder = templateConfig.newBuilder(); + stagingBuilder.getKey().setEnvironment("staging"); + IJobConfiguration staging = IJobConfiguration.build(stagingBuilder); + + saveAcceptedJob(prod); + saveAcceptedJob(staging); + + assertNull(fetchJob( + IJobKey.build(templateConfig.getKey().newBuilder().setEnvironment("test"))).orNull()); + assertEquals(prod, fetchJob(prod.getKey()).orNull()); + assertEquals(staging, fetchJob(staging.getKey()).orNull()); + + removeJob(prod.getKey()); + assertNull(fetchJob(prod.getKey()).orNull()); + assertEquals(staging, fetchJob(staging.getKey()).orNull()); + } + + private static IJobConfiguration makeJob(String name) { + IJobKey job = JobKeys.from("role-" + name, "env-" + name, name); + ITaskConfig config = TaskTestUtil.makeConfig(job); + + return StorageEntityUtil.assertFullyPopulated( + IJobConfiguration.build( + new JobConfiguration() + .setKey(job.newBuilder()) + .setOwner(new Identity(job.getRole(), "user")) + .setCronSchedule("schedule") + .setCronCollisionPolicy(CronCollisionPolicy.CANCEL_NEW) + .setTaskConfig(config.newBuilder()) + .setInstanceCount(5))); + } + + private Set<IJobConfiguration> fetchJobs() { + return storage.read(new Work.Quiet<Set<IJobConfiguration>>() { + @Override + public Set<IJobConfiguration> apply(StoreProvider storeProvider) { + return ImmutableSet.copyOf(storeProvider.getCronJobStore().fetchJobs()); + } + }); + } + + private Optional<IJobConfiguration> fetchJob(final IJobKey jobKey) { + return storage.read(new Work.Quiet<Optional<IJobConfiguration>>() { + @Override + public Optional<IJobConfiguration> apply(StoreProvider storeProvider) { + return storeProvider.getCronJobStore().fetchJob(jobKey); + } + }); + } + + private void saveAcceptedJob(final IJobConfiguration jobConfig) { + storage.write(new MutateWork.NoResult.Quiet() { + @Override + protected void execute(MutableStoreProvider storeProvider) { + storeProvider.getCronJobStore().saveAcceptedJob(jobConfig); + } + }); + } + + private void removeJob(final IJobKey jobKey) { + storage.write(new MutateWork.NoResult.Quiet() { + @Override + protected void execute(MutableStoreProvider storeProvider) { + storeProvider.getCronJobStore().removeJob(jobKey); + } + }); + } + + private void deleteJobs() { + storage.write(new MutateWork.NoResult.Quiet() { + @Override + protected void execute(MutableStoreProvider storeProvider) { + storeProvider.getCronJobStore().deleteJobs(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java index 5ad0de7..a76ae48 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/StorageBackfillTest.java @@ -16,19 +16,14 @@ package org.apache.aurora.scheduler.storage; import java.util.Set; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import org.apache.aurora.gen.Container; -import org.apache.aurora.gen.JobConfiguration; import org.apache.aurora.gen.MesosContainer; import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.configuration.ConfigurationManager; -import org.apache.aurora.scheduler.configuration.SanitizedConfiguration; import org.apache.aurora.scheduler.storage.db.DbUtil; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.junit.Before; import org.junit.Test; @@ -52,40 +47,6 @@ public class StorageBackfillTest { } @Test - public void testJobConfigurationBackfill() throws Exception { - TaskConfig task = TASK.getAssignedTask().getTask().newBuilder(); - final JobConfiguration config = new JobConfiguration() - .setOwner(task.getOwner()) - .setKey(task.getJob()) - .setInstanceCount(1) - .setTaskConfig(task); - - SanitizedConfiguration expected = - SanitizedConfiguration.fromUnsanitized(IJobConfiguration.build(config)); - - // Unset task config job key. - config.getTaskConfig().unsetJob(); - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - protected void execute(Storage.MutableStoreProvider storeProvider) { - storeProvider.getCronJobStore().saveAcceptedJob(IJobConfiguration.build(config)); - } - }); - - backfill(); - - IJobConfiguration actual = Iterables.getOnlyElement( - storage.read(new Storage.Work.Quiet<Iterable<IJobConfiguration>>() { - @Override - public Iterable<IJobConfiguration> apply(Storage.StoreProvider storeProvider) { - return storeProvider.getCronJobStore().fetchJobs(); - } - })); - - assertEquals(expected.getJobConfig(), actual); - } - - @Test public void testBackfillTask() throws Exception { ScheduledTask task = TASK.newBuilder(); ConfigurationManager.applyDefaultsIfUnset(task.getAssignedTask().getTask()); http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/test/java/org/apache/aurora/scheduler/storage/db/DbCronJobStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbCronJobStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbCronJobStoreTest.java new file mode 100644 index 0000000..28b06d0 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbCronJobStoreTest.java @@ -0,0 +1,39 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.db; + +import com.google.inject.AbstractModule; +import com.google.inject.Module; +import com.google.inject.util.Modules; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.util.Clock; +import com.twitter.common.util.testing.FakeClock; + +import org.apache.aurora.scheduler.storage.AbstractCronJobStoreTest; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; + +public class DbCronJobStoreTest extends AbstractCronJobStoreTest { + @Override + protected Module getStorageModule() { + return Modules.combine( + DbModule.testModule(), + new AbstractModule() { + @Override + protected void configure() { + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(Clock.class).toInstance(new FakeClock()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/test/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollectorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollectorTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollectorTest.java index 31feaea..85b6a35 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollectorTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/db/RowGarbageCollectorTest.java @@ -94,7 +94,7 @@ public class RowGarbageCollectorTest { InsertResult a2Insert = new InsertResult(); taskConfigMapper.insert(TASK_A2.getAssignedTask().getTask(), a2Insert); taskMapper.insertScheduledTask( - new ScheduledTaskWrapper(-1, a2Insert.getId(), TASK_A2.newBuilder())); + new ScheduledTaskWrapper(a2Insert.getId(), TASK_A2.newBuilder())); jobKeyMapper.merge(JOB_B); taskConfigMapper.insert(CONFIG_B, new InsertResult()); rowGc.runOneIteration(); http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java index 2ed7483..77efffb 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/InMemTaskStoreTest.java @@ -40,7 +40,7 @@ public class InMemTaskStoreTest extends AbstractTaskStoreTest { protected Module getStorageModule() { statsProvider = new FakeStatsProvider(); return Modules.combine( - DbModule.testModule(PLAIN, Optional.of(new InMemStoresModule.TaskStoreModule(PLAIN))), + DbModule.testModule(PLAIN, Optional.of(new InMemStoresModule(PLAIN))), new AbstractModule() { @Override protected void configure() { http://git-wip-us.apache.org/repos/asf/aurora/blob/88f88758/src/test/java/org/apache/aurora/scheduler/storage/mem/MemCronJobStoreTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemCronJobStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemCronJobStoreTest.java index 58256af..0c54edf 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemCronJobStoreTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemCronJobStoreTest.java @@ -13,79 +13,28 @@ */ package org.apache.aurora.scheduler.storage.mem; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.gen.JobConfiguration; -import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.storage.CronJobStore; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -public class MemCronJobStoreTest { - private static final IJobConfiguration JOB_A = makeJob("a"); - private static final IJobConfiguration JOB_B = makeJob("b"); - - private static final IJobKey KEY_A = JOB_A.getKey(); - private static final IJobKey KEY_B = JOB_B.getKey(); - - private CronJobStore.Mutable store; - - @Before - public void setUp() { - store = new MemJobStore(); - } - - @Test - public void testJobStore() { - assertNull(store.fetchJob(JobKeys.from("nobody", "nowhere", "noname")).orNull()); - assertEquals(ImmutableSet.<IJobConfiguration>of(), store.fetchJobs()); - - store.saveAcceptedJob(JOB_A); - assertEquals(JOB_A, store.fetchJob(KEY_A).orNull()); - assertEquals(ImmutableSet.of(JOB_A), store.fetchJobs()); - - store.saveAcceptedJob(JOB_B); - assertEquals(JOB_B, store.fetchJob(KEY_B).orNull()); - assertEquals(ImmutableSet.of(JOB_A, JOB_B), store.fetchJobs()); - - store.removeJob(KEY_B); - assertEquals(ImmutableSet.of(JOB_A), store.fetchJobs()); - - store.deleteJobs(); - assertEquals(ImmutableSet.<IJobConfiguration>of(), store.fetchJobs()); - } - - @Test - public void testJobStoreSameEnvironment() { - IJobConfiguration templateConfig = makeJob("labrat"); - JobConfiguration prodBuilder = templateConfig.newBuilder(); - prodBuilder.getKey().setEnvironment("prod"); - IJobConfiguration prod = IJobConfiguration.build(prodBuilder); - JobConfiguration stagingBuilder = templateConfig.newBuilder(); - stagingBuilder.getKey().setEnvironment("staging"); - IJobConfiguration staging = IJobConfiguration.build(stagingBuilder); - - store.saveAcceptedJob(prod); - store.saveAcceptedJob(staging); - - assertNull(store.fetchJob( - IJobKey.build(templateConfig.getKey().newBuilder().setEnvironment("test"))).orNull()); - assertEquals(prod, store.fetchJob(prod.getKey()).orNull()); - assertEquals(staging, store.fetchJob(staging.getKey()).orNull()); - - store.removeJob(prod.getKey()); - assertNull(store.fetchJob(prod.getKey()).orNull()); - assertEquals(staging, store.fetchJob(staging.getKey()).orNull()); - } - - private static IJobConfiguration makeJob(String name) { - return IJobConfiguration.build( - new JobConfiguration().setKey(JobKeys.from("role-" + name, "env-" + name, name) - .newBuilder())); +import com.google.common.base.Optional; +import com.google.inject.AbstractModule; +import com.google.inject.Module; +import com.google.inject.util.Modules; +import com.twitter.common.stats.StatsProvider; + +import org.apache.aurora.scheduler.storage.AbstractCronJobStoreTest; +import org.apache.aurora.scheduler.storage.db.DbModule; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; + +import static com.twitter.common.inject.Bindings.KeyFactory.PLAIN; + +public class MemCronJobStoreTest extends AbstractCronJobStoreTest { + @Override + protected Module getStorageModule() { + return Modules.combine( + DbModule.testModule(PLAIN, Optional.of(new InMemStoresModule(PLAIN))), + new AbstractModule() { + @Override + protected void configure() { + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + } + }); } }
