Repository: hbase Updated Branches: refs/heads/master 538388c2b -> d3eedb24e
HBASE-6778 Deprecate Chore; its a thread per task when we should have one thread to do all tasks - add new files Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d3eedb24 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d3eedb24 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d3eedb24 Branch: refs/heads/master Commit: d3eedb24e1ebf56ce820ea77522b695350fada6c Parents: 538388c Author: tedyu <[email protected]> Authored: Thu Jan 29 21:11:33 2015 -0800 Committer: tedyu <[email protected]> Committed: Thu Jan 29 21:11:33 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ChoreService.java | 368 ++++++++ .../org/apache/hadoop/hbase/ScheduledChore.java | 330 ++++++++ .../apache/hadoop/hbase/TestChoreService.java | 844 +++++++++++++++++++ 3 files changed, 1542 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d3eedb24/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java new file mode 100644 index 0000000..fd6cbc9 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java @@ -0,0 +1,368 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run + * periodically while sharing threads. The ChoreService is backed by a + * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the + * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the + * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads. + * <p> + * The ChoreService provides the ability to schedule, cancel, and trigger instances of + * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of + * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling + * load and whether or not the scheduled chores are executing on time. As more chores are scheduled, + * there may be a need to increase the number of threads if it is noticed that chores are no longer + * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is + * made to reduce the number of running threads to see if chores can still meet their start times + * with a smaller thread pool. + * <p> + * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}. + * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly. + */ [email protected] +public class ChoreService implements ChoreServicer { + private final Log LOG = LogFactory.getLog(this.getClass()); + + /** + * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor + */ + public final static int MIN_CORE_POOL_SIZE = 1; + + /** + * This thread pool is used to schedule all of the Chores + */ + private final ScheduledThreadPoolExecutor scheduler; + + /** + * Maps chores to their futures. Futures are used to control a chore's schedule + */ + private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores; + + /** + * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the + * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to + * increase the core pool size by 1 (otherwise a single long running chore whose execution is + * longer than its period would be able to spawn too many threads). + */ + private final HashMap<ScheduledChore, Boolean> choresMissingStartTime; + + /** + * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the + * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is + * running on. The prefix is useful because it allows us to monitor how the thread pool of a + * particular service changes over time VIA thread dumps. + */ + private final String coreThreadPoolPrefix; + + /** + * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads + * spawned by this service + */ + public ChoreService(final String coreThreadPoolPrefix) { + this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE); + } + + /** + * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads + * spawned by this service + * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor + * to during initialization. The default size is 1, but specifying a larger size may be + * beneficial if you know that 1 thread will not be enough. + */ + public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) { + this.coreThreadPoolPrefix = coreThreadPoolPrefix; + if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE; + final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix); + scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + scheduler.setRemoveOnCancelPolicy(true); + scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>(); + choresMissingStartTime = new HashMap<ScheduledChore, Boolean>(); + } + + /** + * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads + * spawned by this service + */ + public static ChoreService getInstance(final String coreThreadPoolPrefix) { + return new ChoreService(coreThreadPoolPrefix); + } + + /** + * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService + * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled + * with a single ChoreService instance). + * @return true when the chore was successfully scheduled. false when the scheduling failed + * (typically occurs when a chore is scheduled during shutdown of service) + */ + public synchronized boolean scheduleChore(ScheduledChore chore) { + if (chore == null) return false; + + try { + ScheduledFuture<?> future = + scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(), + chore.getTimeUnit()); + chore.setChoreServicer(this); + scheduledChores.put(chore, future); + return true; + } catch (Exception exception) { + if (LOG.isInfoEnabled()) { + LOG.info("Could not successfully schedule chore: " + chore.getName()); + } + return false; + } + } + + /** + * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService + * yet then this call is equivalent to a call to scheduleChore. + */ + private synchronized void rescheduleChore(ScheduledChore chore) { + if (chore == null) return; + + if (scheduledChores.containsKey(chore)) { + ScheduledFuture<?> future = scheduledChores.get(chore); + future.cancel(false); + } + scheduleChore(chore); + } + + @Override + public synchronized void cancelChore(ScheduledChore chore) { + cancelChore(chore, false); + } + + @Override + public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) { + if (chore != null && scheduledChores.containsKey(chore)) { + ScheduledFuture<?> future = scheduledChores.get(chore); + future.cancel(mayInterruptIfRunning); + scheduledChores.remove(chore); + + // Removing a chore that was missing its start time means it may be possible + // to reduce the number of threads + if (choresMissingStartTime.containsKey(chore)) { + choresMissingStartTime.remove(chore); + requestCorePoolDecrease(); + } + } + } + + @Override + public synchronized boolean isChoreScheduled(ScheduledChore chore) { + return chore != null && scheduledChores.containsKey(chore) + && !scheduledChores.get(chore).isDone(); + } + + @Override + public synchronized boolean triggerNow(ScheduledChore chore) { + if (chore == null) { + return false; + } else { + rescheduleChore(chore); + return true; + } + } + + /** + * @return number of chores that this service currently has scheduled + */ + int getNumberOfScheduledChores() { + return scheduledChores.size(); + } + + /** + * @return number of chores that this service currently has scheduled that are missing their + * scheduled start time + */ + int getNumberOfChoresMissingStartTime() { + return choresMissingStartTime.size(); + } + + /** + * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor + */ + int getCorePoolSize() { + return scheduler.getCorePoolSize(); + } + + /** + * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are + * daemon threads, and thus, don't prevent the JVM from shutting down + */ + static class ChoreServiceThreadFactory implements ThreadFactory { + private final String threadPrefix; + private final static String THREAD_NAME_SUFFIX = "_ChoreService_"; + private AtomicInteger threadNumber = new AtomicInteger(1); + + /** + * @param threadPrefix The prefix given to all threads created by this factory + */ + public ChoreServiceThreadFactory(final String threadPrefix) { + this.threadPrefix = threadPrefix; + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = + new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement()); + thread.setDaemon(true); + return thread; + } + } + + /** + * Represents a request to increase the number of core pool threads. Typically a request + * originates from the fact that the current core pool size is not sufficient to service all of + * the currently running Chores + * @return true when the request to increase the core pool size succeeds + */ + private synchronized boolean requestCorePoolIncrease() { + // There is no point in creating more threads than scheduledChores.size since scheduled runs + // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced + // amongst occurrences of the same chore). + if (scheduler.getCorePoolSize() < scheduledChores.size()) { + scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1); + printChoreServiceDetails("requestCorePoolIncrease"); + return true; + } + return false; + } + + /** + * Represents a request to decrease the number of core pool threads. Typically a request + * originates from the fact that the current core pool size is more than sufficient to service the + * running Chores. + */ + private synchronized void requestCorePoolDecrease() { + if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) { + scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1); + printChoreServiceDetails("requestCorePoolDecrease"); + } + } + + @Override + public synchronized void onChoreMissedStartTime(ScheduledChore chore) { + if (chore == null || !scheduledChores.containsKey(chore)) return; + + // If the chore has not caused an increase in the size of the core thread pool then request an + // increase. This allows each chore missing its start time to increase the core pool size by + // at most 1. + if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) { + choresMissingStartTime.put(chore, requestCorePoolIncrease()); + } + + // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If + // the chore is NOT rescheduled, future executions of this chore will be delayed more and + // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates + // idle threads to chores based on how delayed they are. + rescheduleChore(chore); + printChoreDetails("onChoreMissedStartTime", chore); + } + + /** + * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores + * in the middle of execution will be interrupted and shutdown. This service will be unusable + * after this method has been called (i.e. future scheduling attempts will fail). + */ + public void shutdown() { + List<Runnable> ongoing = scheduler.shutdownNow(); + if (LOG.isInfoEnabled()) { + LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + ongoing + " on shutdown"); + } + cancelAllChores(true); + scheduledChores.clear(); + choresMissingStartTime.clear(); + } + + /** + * @return true when the service is shutdown and thus cannot be used anymore + */ + public boolean isShutdown() { + return scheduler.isShutdown(); + } + + /** + * @return true when the service is shutdown and all threads have terminated + */ + public boolean isTerminated() { + return scheduler.isTerminated(); + } + + private void cancelAllChores(final boolean mayInterruptIfRunning) { + ArrayList<ScheduledChore> choresToCancel = new ArrayList<ScheduledChore>(); + // Build list of chores to cancel so we can iterate through a set that won't change + // as chores are cancelled. If we tried to cancel each chore while iterating through + // keySet the results would be undefined because the keySet would be changing + for (ScheduledChore chore : scheduledChores.keySet()) { + choresToCancel.add(chore); + } + for (ScheduledChore chore : choresToCancel) { + chore.cancel(mayInterruptIfRunning); + } + choresToCancel.clear(); + } + + /** + * Prints a summary of important details about the chore. Used for debugging purposes + */ + private void printChoreDetails(final String header, ScheduledChore chore) { + LinkedHashMap<String, String> output = new LinkedHashMap<String, String>(); + output.put(header, ""); + output.put("Chore name: ", chore.getName()); + output.put("Chore period: ", Integer.toString(chore.getPeriod())); + output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns())); + + for (Entry<String, String> entry : output.entrySet()) { + if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue()); + } + } + + /** + * Prints a summary of important details about the service. Used for debugging purposes + */ + private void printChoreServiceDetails(final String header) { + LinkedHashMap<String, String> output = new LinkedHashMap<String, String>(); + output.put(header, ""); + output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize())); + output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores())); + output.put("ChoreService missingStartTimeCount: ", + Integer.toString(getNumberOfChoresMissingStartTime())); + + for (Entry<String, String> entry : output.entrySet()) { + if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d3eedb24/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java new file mode 100644 index 0000000..84002c5 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java @@ -0,0 +1,330 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once + * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The + * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for + * access to the threads in the core thread pool. If an unhandled exception occurs, the chore + * cancellation is logged. Implementers should consider whether or not the Chore will be able to + * execute within the defined period. It is bad practice to define a ScheduledChore whose execution + * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s + * thread pool. + * <p> + * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as + * an entry being added to a queue, etc. + */ [email protected] +public abstract class ScheduledChore implements Runnable { + private final Log LOG = LogFactory.getLog(this.getClass()); + + private final String name; + + /** + * Default values for scheduling parameters should they be excluded during construction + */ + private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private final static long DEFAULT_INITIAL_DELAY = 0; + + /** + * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically + */ + private final int period; + private final TimeUnit timeUnit; + private final long initialDelay; + + /** + * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is + * not scheduled. + */ + private ChoreServicer choreServicer; + + /** + * Variables that encapsulate the meaningful state information + */ + private long timeOfLastRun = -1; + private long timeOfThisRun = -1; + private boolean initialChoreComplete = false; + + /** + * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been + * stopped, it will cancel itself. This is particularly useful in the case where a single stopper + * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} + * command can cause many chores to stop together. + */ + private final Stoppable stopper; + + interface ChoreServicer { + /** + * Cancel any ongoing schedules that this chore has with the implementer of this interface. + */ + public void cancelChore(ScheduledChore chore); + public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning); + + /** + * @return true when the chore is scheduled with the implementer of this interface + */ + public boolean isChoreScheduled(ScheduledChore chore); + + /** + * This method tries to execute the chore immediately. If the chore is executing at the time of + * this call, the chore will begin another execution as soon as the current execution finishes + * <p> + * If the chore is not scheduled with a ChoreService, this call will fail. + * @return false when the chore could not be triggered immediately + */ + public boolean triggerNow(ScheduledChore chore); + + /** + * A callback that tells the implementer of this interface that one of the scheduled chores is + * missing its start time. The implication of a chore missing its start time is that the + * service's current means of scheduling may not be sufficient to handle the number of ongoing + * chores (the other explanation is that the chore's execution time is greater than its + * scheduled period). The service should try to increase its concurrency when this callback is + * received. + * @param chore The chore that missed its start time + */ + public void onChoreMissedStartTime(ScheduledChore chore); + } + + /** + * This constructor is for test only. It allows us to create an object and to call chore() on it. + */ + protected ScheduledChore() { + this.name = null; + this.stopper = null; + this.period = 0; + this.initialDelay = DEFAULT_INITIAL_DELAY; + this.timeUnit = DEFAULT_TIME_UNIT; + } + + /** + * @param name Name assigned to Chore. Useful for identification amongst chores of the same type + * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup + * @param period Period with which this Chore repeats execution when scheduled. + */ + public ScheduledChore(final String name, Stoppable stopper, final int period) { + this(name, stopper, period, DEFAULT_INITIAL_DELAY); + } + + /** + * @param name Name assigned to Chore. Useful for identification amongst chores of the same type + * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup + * @param period Period with which this Chore repeats execution when scheduled. + * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A + * value of 0 means the chore will begin to execute immediately. Negative delays are + * invalid and will be corrected to a value of 0. + */ + public ScheduledChore(final String name, Stoppable stopper, final int period, + final long initialDelay) { + this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT); + } + + /** + * @param name Name assigned to Chore. Useful for identification amongst chores of the same type + * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup + * @param period Period with which this Chore repeats execution when scheduled. + * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A + * value of 0 means the chore will begin to execute immediately. Negative delays are + * invalid and will be corrected to a value of 0. + * @param unit The unit that is used to measure period and initialDelay + */ + public ScheduledChore(final String name, Stoppable stopper, final int period, + final long initialDelay, final TimeUnit unit) { + this.name = name; + this.stopper = stopper; + this.period = period; + this.initialDelay = initialDelay < 0 ? 0 : initialDelay; + this.timeUnit = unit; + } + + synchronized void resetState() { + timeOfLastRun = -1; + timeOfThisRun = -1; + initialChoreComplete = false; + } + + /** + * @see java.lang.Thread#run() + */ + @Override + public synchronized void run() { + timeOfLastRun = timeOfThisRun; + timeOfThisRun = System.currentTimeMillis(); + if (missedStartTime() && choreServicer != null) { + choreServicer.onChoreMissedStartTime(this); + if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time"); + } else if (stopper.isStopped()) { + cancel(); + cleanup(); + LOG.info("Chore: " + getName() + " was stopped"); + } else { + try { + if (!initialChoreComplete) { + initialChoreComplete = initialChore(); + } else { + chore(); + } + } catch (Throwable t) { + LOG.error("Caught error", t); + if (this.stopper.isStopped()) { + cancel(); + cleanup(); + } + } + } + } + + /** + * @return How long has it been since this chore last run. Useful for checking if the chore has + * missed its scheduled start time by too large of a margin + */ + synchronized long getTimeBetweenRuns() { + return timeOfThisRun - timeOfLastRun; + } + + /** + * @return true when the time between runs exceeds the acceptable threshold + */ + private synchronized boolean missedStartTime() { + return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun) + && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns(); + } + + private synchronized double getMaximumAllowedTimeBetweenRuns() { + // Threshold used to determine if the Chore's current run started too late + return 1.5 * period; + } + + private synchronized boolean isValidTime(final long time) { + return time > 0 && time <= System.currentTimeMillis(); + } + + /** + * @return false when the Chore is not currently scheduled with a ChoreService + */ + public synchronized boolean triggerNow() { + if (choreServicer != null) { + return choreServicer.triggerNow(this); + } else { + return false; + } + } + + synchronized void setChoreServicer(ChoreServicer service) { + // Chores should only ever be scheduled with a single ChoreService. If the choreServicer + // is changing, cancel any existing schedules of this chore. + if (choreServicer != null && choreServicer != service) { + choreServicer.cancelChore(this, false); + } + choreServicer = service; + timeOfThisRun = System.currentTimeMillis(); + } + + public synchronized void cancel() { + cancel(false); + } + + public synchronized void cancel(boolean mayInterruptIfRunning) { + if (choreServicer != null) choreServicer.cancelChore(this, mayInterruptIfRunning); + + choreServicer = null; + } + + public synchronized String getName() { + return name; + } + + public synchronized Stoppable getStopper() { + return stopper; + } + + public synchronized int getPeriod() { + return period; + } + + public synchronized long getInitialDelay() { + return initialDelay; + } + + public final synchronized TimeUnit getTimeUnit() { + return timeUnit; + } + + public synchronized boolean isInitialChoreComplete() { + return initialChoreComplete; + } + + @VisibleForTesting + synchronized ChoreServicer getChoreServicer() { + return choreServicer; + } + + @VisibleForTesting + synchronized long getTimeOfLastRun() { + return timeOfLastRun; + } + + @VisibleForTesting + synchronized long getTimeOfThisRun() { + return timeOfThisRun; + } + + /** + * @return true when this Chore is scheduled with a ChoreService + */ + public synchronized boolean isScheduled() { + return choreServicer != null && choreServicer.isChoreScheduled(this); + } + + @VisibleForTesting + public synchronized void choreForTesting() { + chore(); + } + + /** + * The task to execute on each scheduled execution of the Chore + */ + protected abstract void chore(); + + /** + * Override to run a task before we start looping. + * @return true if initial chore was successful + */ + protected synchronized boolean initialChore() { + // Default does nothing + return true; + } + + /** + * Override to run cleanup tasks when the Chore encounters an error and must stop running + */ + protected synchronized void cleanup() { + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d3eedb24/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java new file mode 100644 index 0000000..35cc530 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java @@ -0,0 +1,844 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SampleStopper; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SleepingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestChoreService { + private final Log LOG = LogFactory.getLog(this.getClass()); + private final String TEST_SERVER_NAME = "testServerName"; + + /** + * A few ScheduledChore samples that are useful for testing with ChoreService + */ + public static class ScheduledChoreSamples { + /** + * Straight forward stopper implementation that is used by default when one is not provided + */ + public static class SampleStopper implements Stoppable { + private boolean stopped = false; + + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + } + + /** + * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic + * executions + */ + public static class SlowChore extends ScheduledChore { + public SlowChore(String name, int period) { + this(name, new SampleStopper(), period); + } + + public SlowChore(String name, Stoppable stopper, int period) { + super(name, stopper, period); + } + + @Override + protected boolean initialChore() { + try { + Thread.sleep(getPeriod() * 2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return true; + } + + @Override + protected void chore() { + try { + Thread.sleep(getPeriod() * 2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests + */ + public static class DoNothingChore extends ScheduledChore { + public DoNothingChore(String name, int period) { + super(name, new SampleStopper(), period); + } + + public DoNothingChore(String name, Stoppable stopper, int period) { + super(name, stopper, period); + } + + @Override + protected void chore() { + // DO NOTHING + } + + } + + public static class SleepingChore extends ScheduledChore { + private int sleepTime; + + public SleepingChore(String name, int chorePeriod, int sleepTime) { + this(name, new SampleStopper(), chorePeriod, sleepTime); + } + + public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) { + super(name, stopper, period); + this.sleepTime = sleepTime; + } + + @Override + protected boolean initialChore() { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return true; + } + + @Override + protected void chore() { + try { + Thread.sleep(sleepTime); + } catch (Exception e) { + System.err.println(e.getStackTrace()); + } + } + } + + public static class CountingChore extends ScheduledChore { + private int countOfChoreCalls; + private boolean outputOnTicks = false; + + public CountingChore(String name, int period) { + this(name, new SampleStopper(), period); + } + + public CountingChore(String name, Stoppable stopper, int period) { + this(name, stopper, period, false); + } + + public CountingChore(String name, Stoppable stopper, int period, + final boolean outputOnTicks) { + super(name, stopper, period); + this.countOfChoreCalls = 0; + this.outputOnTicks = outputOnTicks; + } + + @Override + protected boolean initialChore() { + countOfChoreCalls++; + if (outputOnTicks) outputTickCount(); + return true; + } + + @Override + protected void chore() { + countOfChoreCalls++; + if (outputOnTicks) outputTickCount(); + } + + private void outputTickCount() { + System.out.println("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls); + } + + public int getCountOfChoreCalls() { + return countOfChoreCalls; + } + + public boolean isOutputtingOnTicks() { + return outputOnTicks; + } + + public void setOutputOnTicks(boolean o) { + outputOnTicks = o; + } + } + + /** + * A Chore that will try to execute the initial chore a few times before succeeding. Once the + * initial chore is complete the chore cancels itself + */ + public static class FailInitialChore extends ScheduledChore { + private int numberOfFailures; + private int failureThreshold; + + /** + * @param failThreshold Number of times the Chore fails when trying to execute initialChore + * before succeeding. + */ + public FailInitialChore(String name, int period, int failThreshold) { + this(name, new SampleStopper(), period, failThreshold); + } + + public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) { + super(name, stopper, period); + numberOfFailures = 0; + failureThreshold = failThreshold; + } + + @Override + protected boolean initialChore() { + if (numberOfFailures < failureThreshold) { + numberOfFailures++; + return false; + } else { + return true; + } + } + + @Override + protected void chore() { + assertTrue(numberOfFailures == failureThreshold); + cancel(false); + } + + } + } + + @Test + public void testInitialChorePrecedence() throws InterruptedException { + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + + final int period = 100; + final int failureThreshold = 5; + ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold); + service.scheduleChore(chore); + + int loopCount = 0; + boolean brokeOutOfLoop = false; + + while (!chore.isInitialChoreComplete() && chore.isScheduled()) { + Thread.sleep(failureThreshold * period); + loopCount++; + if (loopCount > 3) { + brokeOutOfLoop = true; + break; + } + } + + assertFalse(brokeOutOfLoop); + } + + @Test + public void testCancelChore() throws InterruptedException { + final int period = 100; + ScheduledChore chore1 = new DoNothingChore("chore1", period); + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + + service.scheduleChore(chore1); + assertTrue(chore1.isScheduled()); + + chore1.cancel(true); + assertFalse(chore1.isScheduled()); + assertTrue(service.getNumberOfScheduledChores() == 0); + } + + @Test + public void testScheduledChoreConstruction() { + final String NAME = "chore"; + final int PERIOD = 100; + final long VALID_DELAY = 0; + final long INVALID_DELAY = -100; + final TimeUnit UNIT = TimeUnit.NANOSECONDS; + + ScheduledChore chore1 = + new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) { + @Override + protected void chore() { + // DO NOTHING + } + }; + + assertEquals("Name construction failed", chore1.getName(), NAME); + assertEquals("Period construction failed", chore1.getPeriod(), PERIOD); + assertEquals("Initial Delay construction failed", chore1.getInitialDelay(), VALID_DELAY); + assertEquals("TimeUnit construction failed", chore1.getTimeUnit(), UNIT); + + ScheduledChore invalidDelayChore = + new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) { + @Override + protected void chore() { + // DO NOTHING + } + }; + + assertEquals("Initial Delay should be set to 0 when invalid", 0, + invalidDelayChore.getInitialDelay()); + } + + @Test + public void testChoreServiceConstruction() { + final int corePoolSize = 10; + final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE; + + ChoreService customInit = new ChoreService(TEST_SERVER_NAME, corePoolSize); + assertEquals(corePoolSize, customInit.getCorePoolSize()); + + ChoreService defaultInit = new ChoreService(TEST_SERVER_NAME); + assertEquals(defaultCorePoolSize, defaultInit.getCorePoolSize()); + + ChoreService invalidInit = new ChoreService(TEST_SERVER_NAME, -10); + assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize()); + } + + @Test + public void testFrequencyOfChores() throws InterruptedException { + final int period = 100; + // Small delta that acts as time buffer (allowing chores to complete if running slowly) + final int delta = 5; + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + CountingChore chore = new CountingChore("countingChore", period); + service.scheduleChore(chore); + + Thread.sleep(10 * period + delta); + assertTrue(chore.getCountOfChoreCalls() == 11); + + Thread.sleep(10 * period); + assertTrue(chore.getCountOfChoreCalls() == 21); + } + + @Test + public void testForceTrigger() throws InterruptedException { + final int period = 100; + final int delta = 5; + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + CountingChore chore = new CountingChore("countingChore", period); + service.scheduleChore(chore); + Thread.sleep(10 * period + delta); + + assertTrue(chore.getCountOfChoreCalls() == 11); + + // Force five runs of the chore to occur, sleeping between triggers to ensure the + // chore has time to run + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + + assertTrue(chore.getCountOfChoreCalls() == 16); + + Thread.sleep(10 * period + delta); + + assertTrue(chore.getCountOfChoreCalls() == 26); + } + + @Test + public void testCorePoolIncrease() throws InterruptedException { + final int initialCorePoolSize = 3; + ChoreService service = new ChoreService(TEST_SERVER_NAME, initialCorePoolSize); + assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize, + service.getCorePoolSize()); + + final int slowChorePeriod = 100; + SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod); + SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod); + SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod); + + service.scheduleChore(slowChore1); + service.scheduleChore(slowChore2); + service.scheduleChore(slowChore3); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Should not create more pools than scheduled chores", 3, + service.getCorePoolSize()); + + SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod); + service.scheduleChore(slowChore4); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 4, + service.getCorePoolSize()); + + SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod); + service.scheduleChore(slowChore5); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 5, + service.getCorePoolSize()); + } + + @Test + public void testCorePoolDecrease() throws InterruptedException { + final int initialCorePoolSize = 3; + ChoreService service = new ChoreService(TEST_SERVER_NAME, initialCorePoolSize); + final int chorePeriod = 10; + + // Slow chores always miss their start time and thus the core pool size should be at least as + // large as the number of running slow chores + SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod); + SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod); + SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod); + + service.scheduleChore(slowChore1); + service.scheduleChore(slowChore2); + service.scheduleChore(slowChore3); + + Thread.sleep(chorePeriod * 10); + assertEquals("Should not create more pools than scheduled chores", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); + + SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod); + service.scheduleChore(slowChore4); + Thread.sleep(chorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); + + SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod); + service.scheduleChore(slowChore5); + Thread.sleep(chorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 5); + + slowChore5.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 4); + + slowChore4.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 3); + + slowChore3.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + + slowChore2.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 1); + + slowChore1.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 0); + + slowChore1.resetState(); + service.scheduleChore(slowChore1); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 1); + + slowChore2.resetState(); + service.scheduleChore(slowChore2); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + + DoNothingChore fastChore1 = new DoNothingChore("fastChore1", chorePeriod); + service.scheduleChore(fastChore1); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should increase", 3, service.getCorePoolSize()); + + DoNothingChore fastChore2 = new DoNothingChore("fastChore2", chorePeriod); + service.scheduleChore(fastChore2); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should increase", 3, service.getCorePoolSize()); + + DoNothingChore fastChore3 = new DoNothingChore("fastChore3", chorePeriod); + service.scheduleChore(fastChore3); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should not change", 3, service.getCorePoolSize()); + + DoNothingChore fastChore4 = new DoNothingChore("fastChore4", chorePeriod); + service.scheduleChore(fastChore4); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should not change", 3, service.getCorePoolSize()); + } + + @Test + public void testNumberOfRunningChores() throws InterruptedException { + ChoreService service = new ChoreService(TEST_SERVER_NAME); + + final int period = 100; + final int sleepTime = 5; + + DoNothingChore dn1 = new DoNothingChore("dn1", period); + DoNothingChore dn2 = new DoNothingChore("dn2", period); + DoNothingChore dn3 = new DoNothingChore("dn3", period); + DoNothingChore dn4 = new DoNothingChore("dn4", period); + DoNothingChore dn5 = new DoNothingChore("dn5", period); + + service.scheduleChore(dn1); + service.scheduleChore(dn2); + service.scheduleChore(dn3); + service.scheduleChore(dn4); + service.scheduleChore(dn5); + + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores()); + + dn1.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores()); + + dn2.cancel(); + dn3.cancel(); + dn4.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores()); + + dn5.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores()); + } + + @Test + public void testNumberOfChoresMissingStartTime() throws InterruptedException { + ChoreService service = new ChoreService(TEST_SERVER_NAME); + + final int period = 100; + final int sleepTime = 5 * period; + + // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores + // ALWAYS miss their start time since their execution takes longer than their period + SlowChore sc1 = new SlowChore("sc1", period); + SlowChore sc2 = new SlowChore("sc2", period); + SlowChore sc3 = new SlowChore("sc3", period); + SlowChore sc4 = new SlowChore("sc4", period); + SlowChore sc5 = new SlowChore("sc5", period); + + service.scheduleChore(sc1); + service.scheduleChore(sc2); + service.scheduleChore(sc3); + service.scheduleChore(sc4); + service.scheduleChore(sc5); + + Thread.sleep(sleepTime); + assertEquals(5, service.getNumberOfChoresMissingStartTime()); + + sc1.cancel(); + Thread.sleep(sleepTime); + assertEquals(4, service.getNumberOfChoresMissingStartTime()); + + sc2.cancel(); + sc3.cancel(); + sc4.cancel(); + Thread.sleep(sleepTime); + assertEquals(1, service.getNumberOfChoresMissingStartTime()); + + sc5.cancel(); + Thread.sleep(sleepTime); + assertEquals(0, service.getNumberOfChoresMissingStartTime()); + } + + /** + * ChoreServices should never have a core pool size that exceeds the number of chores that have + * been scheduled with the service. For example, if 4 ScheduledChores are scheduled with a + * ChoreService, the number of threads in the ChoreService's core pool should never exceed 4 + */ + @Test + public void testMaximumChoreServiceThreads() throws InterruptedException { + ChoreService service = new ChoreService(TEST_SERVER_NAME); + + final int period = 10; + final int sleepTime = 5 * period; + + // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores + // ALWAYS miss their start time since their execution takes longer than their period. + // Chores that miss their start time will trigger the onChoreMissedStartTime callback + // in the ChoreService. This callback will try to increase the number of core pool + // threads. + SlowChore sc1 = new SlowChore("sc1", period); + SlowChore sc2 = new SlowChore("sc2", period); + SlowChore sc3 = new SlowChore("sc3", period); + SlowChore sc4 = new SlowChore("sc4", period); + SlowChore sc5 = new SlowChore("sc5", period); + + service.scheduleChore(sc1); + service.scheduleChore(sc2); + service.scheduleChore(sc3); + service.scheduleChore(sc4); + service.scheduleChore(sc5); + + Thread.sleep(sleepTime); + assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); + + SlowChore sc6 = new SlowChore("sc6", period); + SlowChore sc7 = new SlowChore("sc7", period); + SlowChore sc8 = new SlowChore("sc8", period); + SlowChore sc9 = new SlowChore("sc9", period); + SlowChore sc10 = new SlowChore("sc10", period); + + service.scheduleChore(sc6); + service.scheduleChore(sc7); + service.scheduleChore(sc8); + service.scheduleChore(sc9); + service.scheduleChore(sc10); + + Thread.sleep(sleepTime); + assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); + } + + @Test + public void testScheduledChoreReset() throws InterruptedException { + final int period = 100; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore chore = new DoNothingChore("sampleChore", period); + + // TRUE + assertTrue(!chore.isInitialChoreComplete()); + assertTrue(chore.getTimeOfLastRun() == -1); + assertTrue(chore.getTimeOfThisRun() == -1); + + service.scheduleChore(chore); + Thread.sleep(5 * period); + + // FALSE + assertFalse(!chore.isInitialChoreComplete()); + assertFalse(chore.getTimeOfLastRun() == -1); + assertFalse(chore.getTimeOfThisRun() == -1); + + chore.resetState(); + + // TRUE + assertTrue(!chore.isInitialChoreComplete()); + assertTrue(chore.getTimeOfLastRun() == -1); + assertTrue(chore.getTimeOfThisRun() == -1); + } + + @Test + public void testChangingChoreServices() throws InterruptedException { + final int period = 100; + final int sleepTime = 10; + ChoreService service1 = new ChoreService(TEST_SERVER_NAME); + ChoreService service2 = new ChoreService(TEST_SERVER_NAME); + ScheduledChore chore = new DoNothingChore("sample", period); + + assertFalse(chore.isScheduled()); + assertFalse(service1.isChoreScheduled(chore)); + assertFalse(service2.isChoreScheduled(chore)); + assertTrue(chore.getChoreServicer() == null); + + service1.scheduleChore(chore); + Thread.sleep(sleepTime); + assertTrue(chore.isScheduled()); + assertTrue(service1.isChoreScheduled(chore)); + assertFalse(service2.isChoreScheduled(chore)); + assertFalse(chore.getChoreServicer() == null); + + service2.scheduleChore(chore); + Thread.sleep(sleepTime); + assertTrue(chore.isScheduled()); + assertFalse(service1.isChoreScheduled(chore)); + assertTrue(service2.isChoreScheduled(chore)); + assertFalse(chore.getChoreServicer() == null); + + chore.cancel(); + assertFalse(chore.isScheduled()); + assertFalse(service1.isChoreScheduled(chore)); + assertFalse(service2.isChoreScheduled(chore)); + assertTrue(chore.getChoreServicer() == null); + } + + @Test + public void testTriggerNowFailsWhenNotScheduled() throws InterruptedException { + final int period = 100; + // Small sleep time buffer to allow CountingChore to complete + final int sleep = 5; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + CountingChore chore = new CountingChore("dn", period); + + assertFalse(chore.triggerNow()); + assertTrue(chore.getCountOfChoreCalls() == 0); + + service.scheduleChore(chore); + Thread.sleep(sleep); + assertEquals(1, chore.getCountOfChoreCalls()); + Thread.sleep(period); + assertEquals(2, chore.getCountOfChoreCalls()); + assertTrue(chore.triggerNow()); + Thread.sleep(sleep); + assertTrue(chore.triggerNow()); + Thread.sleep(sleep); + assertTrue(chore.triggerNow()); + Thread.sleep(sleep); + assertEquals(5, chore.getCountOfChoreCalls()); + } + + @Test + public void testStopperForScheduledChores() throws InterruptedException { + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + Stoppable stopperForGroup1 = new SampleStopper(); + Stoppable stopperForGroup2 = new SampleStopper(); + final int period = 100; + final int delta = 10; + + ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period); + ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period); + ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period); + + ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period); + ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period); + ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period); + + service.scheduleChore(chore1_group1); + service.scheduleChore(chore2_group1); + service.scheduleChore(chore3_group1); + service.scheduleChore(chore1_group2); + service.scheduleChore(chore2_group2); + service.scheduleChore(chore3_group2); + + Thread.sleep(delta); + Thread.sleep(10 * period); + assertTrue(chore1_group1.isScheduled()); + assertTrue(chore2_group1.isScheduled()); + assertTrue(chore3_group1.isScheduled()); + assertTrue(chore1_group2.isScheduled()); + assertTrue(chore2_group2.isScheduled()); + assertTrue(chore3_group2.isScheduled()); + + stopperForGroup1.stop("test stopping group 1"); + Thread.sleep(period); + assertFalse(chore1_group1.isScheduled()); + assertFalse(chore2_group1.isScheduled()); + assertFalse(chore3_group1.isScheduled()); + assertTrue(chore1_group2.isScheduled()); + assertTrue(chore2_group2.isScheduled()); + assertTrue(chore3_group2.isScheduled()); + + stopperForGroup2.stop("test stopping group 2"); + Thread.sleep(period); + assertFalse(chore1_group1.isScheduled()); + assertFalse(chore2_group1.isScheduled()); + assertFalse(chore3_group1.isScheduled()); + assertFalse(chore1_group2.isScheduled()); + assertFalse(chore2_group2.isScheduled()); + assertFalse(chore3_group2.isScheduled()); + } + + @Test + public void testShutdownCancelsScheduledChores() throws InterruptedException { + final int period = 100; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore successChore1 = new DoNothingChore("sc1", period); + ScheduledChore successChore2 = new DoNothingChore("sc2", period); + ScheduledChore successChore3 = new DoNothingChore("sc3", period); + + assertTrue(service.scheduleChore(successChore1)); + assertTrue(successChore1.isScheduled()); + assertTrue(service.scheduleChore(successChore2)); + assertTrue(successChore2.isScheduled()); + assertTrue(service.scheduleChore(successChore3)); + assertTrue(successChore3.isScheduled()); + + service.shutdown(); + + assertFalse(successChore1.isScheduled()); + assertFalse(successChore2.isScheduled()); + assertFalse(successChore3.isScheduled()); + } + + @Test + public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException { + final int period = 100; + final int sleep = 5 * period; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep); + ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep); + ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep); + + assertTrue(service.scheduleChore(slowChore1)); + assertTrue(service.scheduleChore(slowChore2)); + assertTrue(service.scheduleChore(slowChore3)); + + Thread.sleep(sleep / 2); + service.shutdown(); + + assertFalse(slowChore1.isScheduled()); + assertFalse(slowChore2.isScheduled()); + assertFalse(slowChore3.isScheduled()); + assertTrue(service.isShutdown()); + + Thread.sleep(5); + assertTrue(service.isTerminated()); + } + + @Test + public void testShutdownRejectsNewSchedules() throws InterruptedException { + final int period = 100; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore successChore1 = new DoNothingChore("sc1", period); + ScheduledChore successChore2 = new DoNothingChore("sc2", period); + ScheduledChore successChore3 = new DoNothingChore("sc3", period); + ScheduledChore failChore1 = new DoNothingChore("fc1", period); + ScheduledChore failChore2 = new DoNothingChore("fc2", period); + ScheduledChore failChore3 = new DoNothingChore("fc3", period); + + assertTrue(service.scheduleChore(successChore1)); + assertTrue(successChore1.isScheduled()); + assertTrue(service.scheduleChore(successChore2)); + assertTrue(successChore2.isScheduled()); + assertTrue(service.scheduleChore(successChore3)); + assertTrue(successChore3.isScheduled()); + + service.shutdown(); + + assertFalse(service.scheduleChore(failChore1)); + assertFalse(failChore1.isScheduled()); + assertFalse(service.scheduleChore(failChore2)); + assertFalse(failChore2.isScheduled()); + assertFalse(service.scheduleChore(failChore3)); + assertFalse(failChore3.isScheduled()); + } +}
