http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExecutorServiceShutdown.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExecutorServiceShutdown.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExecutorServiceShutdown.java new file mode 100644 index 0000000..7aad9ef --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExecutorServiceShutdown.java @@ -0,0 +1,71 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import com.google.common.base.Preconditions; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +/** + * An implementation of the graceful shutdown sequence recommended by {@link ExecutorService}. + * + * @author John Sirois + */ +public class ExecutorServiceShutdown implements Command { + private static final Logger LOG = Logger.getLogger(ExecutorServiceShutdown.class.getName()); + + private final ExecutorService executor; + private final Amount<Long, Time> gracePeriod; + + /** + * Creates a new {@code ExecutorServiceShutdown} command that will try to gracefully shut down the + * given {@code executor} when executed. If the supplied grace period is less than or equal to + * zero the executor service will be asked to shut down but no waiting will be done after these + * requests. + * + * @param executor The executor service this command should shut down when executed. + * @param gracePeriod The maximum time to wait after a shutdown request before continuing to the + * next shutdown phase. + */ + public ExecutorServiceShutdown(ExecutorService executor, Amount<Long, Time> gracePeriod) { + this.executor = Preconditions.checkNotNull(executor); + this.gracePeriod = Preconditions.checkNotNull(gracePeriod); + } + + @Override + public void execute() { + executor.shutdown(); // Disable new tasks from being submitted. + try { + // Wait a while for existing tasks to terminate. + if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); // Cancel currently executing tasks. + // Wait a while for tasks to respond to being cancelled. + if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { + LOG.warning("Pool did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted. + executor.shutdownNow(); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/ForwardingExecutorService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/ForwardingExecutorService.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ForwardingExecutorService.java new file mode 100644 index 0000000..b8a0fd9 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ForwardingExecutorService.java @@ -0,0 +1,101 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.concurrent; + +import com.google.common.base.Preconditions; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * An executor service that forwards all calls to another executor service. Subclasses should + * override one or more methods to modify the behavior of the backing executor service as desired + * per the <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>. + * + * @author John Sirois + */ +public class ForwardingExecutorService<T extends ExecutorService> implements ExecutorService { + protected final T delegate; + + public ForwardingExecutorService(T delegate) { + Preconditions.checkNotNull(delegate); + this.delegate = delegate; + } + + public void shutdown() { + delegate.shutdown(); + } + + public List<Runnable> shutdownNow() { + return delegate.shutdownNow(); + } + + public boolean isShutdown() { + return delegate.isShutdown(); + } + + public boolean isTerminated() { + return delegate.isTerminated(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + public <T> Future<T> submit(Callable<T> task) { + return delegate.submit(task); + } + + public <T> Future<T> submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + public Future<?> submit(Runnable task) { + return delegate.submit(task); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + + return delegate.invokeAll(tasks); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, + TimeUnit unit) throws InterruptedException { + + return delegate.invokeAll(tasks, timeout, unit); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + + return delegate.invokeAny(tasks); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + + return delegate.invokeAny(tasks, timeout, unit); + } + + public void execute(Runnable command) { + delegate.execute(command); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/MoreExecutors.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/MoreExecutors.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/MoreExecutors.java new file mode 100644 index 0000000..630b9aa --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/MoreExecutors.java @@ -0,0 +1,122 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; + +/** + * Utility class that provides factory functions to decorate + * {@link java.util.concurrent.ExecutorService}s. + */ +public final class MoreExecutors { + private MoreExecutors() { + // utility + } + + /** + * Returns a {@link ExecutorService} that passes uncaught exceptions to + * {@link java.lang.Thread.UncaughtExceptionHandler}. + * <p> + * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and + * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of + * unchecked exceptions thrown from submitted work. Some users are surprised to find that + * even the default uncaught exception handler is not invoked. + * + * @param executorService delegate + * @param uncaughtExceptionHandler exception handler that will receive exceptions generated + * from executor tasks. + * @return a decorated executor service + */ + public static ExecutorService exceptionHandlingExecutor( + ExecutorService executorService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + + Preconditions.checkNotNull(uncaughtExceptionHandler); + return new ExceptionHandlingExecutorService( + executorService, Suppliers.ofInstance(uncaughtExceptionHandler)); + } + + /** + * Returns a {@link ExecutorService} that passes uncaught exceptions to + * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler() + * at the time the exception is thrown. + * + * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ExecutorService, + * Thread.UncaughtExceptionHandler) + * @param executorService delegate + * @return a decorated executor service + */ + public static ExecutorService exceptionHandlingExecutor(ExecutorService executorService) { + return new ExceptionHandlingExecutorService( + executorService, + new Supplier<Thread.UncaughtExceptionHandler>() { + @Override + public Thread.UncaughtExceptionHandler get() { + return Thread.currentThread().getUncaughtExceptionHandler(); + } + }); + } + + /** + * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to + * {@link java.lang.Thread.UncaughtExceptionHandler}. + * <p> + * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and + * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of + * unchecked exceptions thrown from submitted work. Some users are surprised to find that + * even the default uncaught exception handler is not invoked. + * + * @param executorService delegate + * @param uncaughtExceptionHandler exception handler that will receive exceptions generated + * from executor tasks. + * @return a decorated executor service + */ + public static ScheduledExecutorService exceptionHandlingExecutor( + ScheduledExecutorService executorService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + + Preconditions.checkNotNull(uncaughtExceptionHandler); + return new ExceptionHandlingScheduledExecutorService( + executorService, + Suppliers.ofInstance(uncaughtExceptionHandler)); + } + + /** + * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to + * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler() + * at the time the exception is thrown. + * + * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ScheduledExecutorService, + * Thread.UncaughtExceptionHandler) + * @param executorService delegate + * @return a decorated executor service + */ + public static ScheduledExecutorService exceptionHandlingExecutor( + ScheduledExecutorService executorService) { + + return new ExceptionHandlingScheduledExecutorService( + executorService, + new Supplier<Thread.UncaughtExceptionHandler>() { + @Override + public Thread.UncaughtExceptionHandler get() { + return Thread.currentThread().getUncaughtExceptionHandler(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java new file mode 100644 index 0000000..7448dc1 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java @@ -0,0 +1,81 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.concurrent; + +import com.google.common.base.Preconditions; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A future task that supports retries by resubmitting itself to an {@link ExecutorService}. + * + * @author William Farner + */ +public class RetryingFutureTask extends FutureTask<Boolean> { + private static Logger LOG = Logger.getLogger(RetryingFutureTask.class.getName()); + + protected final ExecutorService executor; + protected final int maxRetries; + protected int numRetries = 0; + protected final Callable<Boolean> callable; + + /** + * Creates a new retrying future task that will execute a unit of work until successfully + * completed, or the retry limit has been reached. + * + * @param executor The executor service to resubmit the task to upon failure. + * @param callable The unit of work. The work is considered successful when {@code true} is + * returned. It may return {@code false} or throw an exception when unsueccessful. + * @param maxRetries The maximum number of times to retry the task. + */ + public RetryingFutureTask(ExecutorService executor, Callable<Boolean> callable, int maxRetries) { + super(callable); + this.callable = Preconditions.checkNotNull(callable); + this.executor = Preconditions.checkNotNull(executor); + this.maxRetries = maxRetries; + } + + /** + * Invokes a retry of this task. + */ + protected void retry() { + executor.execute(this); + } + + @Override + public void run() { + boolean success = false; + try { + success = callable.call(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Exception while executing task.", e); + } + + if (!success) { + numRetries++; + if (numRetries > maxRetries) { + LOG.severe("Task did not complete after " + maxRetries + " retries, giving up."); + } else { + LOG.info("Task was not successful, resubmitting (num retries: " + numRetries + ")"); + retry(); + } + } else { + set(true); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/TaskConverter.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/TaskConverter.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/TaskConverter.java new file mode 100644 index 0000000..5971e37 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/TaskConverter.java @@ -0,0 +1,93 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.concurrent; + +import java.util.Collection; +import java.util.concurrent.Callable; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.Collections2; + +final class TaskConverter { + private TaskConverter() { + // utility + } + + /** + * Returns a wrapped {@link Runnable} that passes uncaught exceptions thrown from the + * original Runnable to {@link Thread.UncaughtExceptionHandler}. + * + * @param runnable runnable to be wrapped + * @param handler exception handler that will receive exceptions generated in the runnable + * @return wrapped runnable + */ + static Runnable alertingRunnable( + final Runnable runnable, + final Supplier<Thread.UncaughtExceptionHandler> handler) { + + return new Runnable() { + @Override + public void run() { + try { + runnable.run(); + } catch (Throwable t) { + handler.get().uncaughtException(Thread.currentThread(), t); + throw Throwables.propagate(t); + } + } + }; + } + + /** + * Returns a wrapped {@link java.util.concurrent.Callable} that passes uncaught exceptions + * thrown from the original Callable to {@link Thread.UncaughtExceptionHandler}. + * + * @param callable callable to be wrapped + * @param handler exception handler that will receive exceptions generated in the callable + * @return wrapped callable + */ + static <V> Callable<V> alertingCallable( + final Callable<V> callable, + final Supplier<Thread.UncaughtExceptionHandler> handler) { + + return new Callable<V>() { + @Override + public V call() throws Exception { + try { + return callable.call(); + } catch (Throwable t) { + handler.get().uncaughtException(Thread.currentThread(), t); + throw Throwables.propagate(t); + } + } + }; + } + + /* + * Calls #alertingCallable on a collection of callables + */ + static <V> Collection<? extends Callable<V>> alertingCallables( + Collection<? extends Callable<V>> callables, + final Supplier<Thread.UncaughtExceptionHandler> handler) { + + return Collections2.transform(callables, new Function<Callable<V>, Callable<V>>() { + @Override + public Callable<V> apply(Callable<V> callable) { + return alertingCallable(callable, handler); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java b/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java new file mode 100644 index 0000000..927fb2b --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java @@ -0,0 +1,49 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.logging; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.io.InputStream; +import java.util.logging.LogManager; + +/** + * A custom java.util.logging configuration class that loads the logging configuration from a + * properties file resource (as opposed to a file as natively supported by LogManager via + * java.util.logging.config.file). By default this configurator will look for the resource at + * /logging.properties but the resource path can be overridden by setting the system property with + * key {@link #LOGGING_PROPERTIES_RESOURCE_PATH java.util.logging.config.resource}. To install this + * configurator you must specify the following system property: + * java.util.logging.config.class=com.twitter.common.util.logging.ResourceLoggingConfigurator + * + * @author John Sirois + */ +public class ResourceLoggingConfigurator { + + /** + * A system property that controls where ResourceLoggingConfigurator looks for the logging + * configuration on the process classpath. + */ + public static final String LOGGING_PROPERTIES_RESOURCE_PATH = "java.util.logging.config.resource"; + + public ResourceLoggingConfigurator() throws IOException { + String loggingPropertiesResourcePath = + System.getProperty(LOGGING_PROPERTIES_RESOURCE_PATH, "/logging.properties"); + InputStream loggingConfig = getClass().getResourceAsStream(loggingPropertiesResourcePath); + Preconditions.checkNotNull(loggingConfig, + "Could not locate logging config file at resource path: %s", loggingPropertiesResourcePath); + LogManager.getLogManager().readConfiguration(loggingConfig); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java b/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java new file mode 100644 index 0000000..66bbb37 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java @@ -0,0 +1,48 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.logging; + +import java.util.logging.LogManager; + +/** + * A LogManager which by default ignores calls to {@link #reset()}. This is useful to avoid missing + * log statements that occur during vm shutdown. The standard LogManager installs a + * {@link Runtime#addShutdownHook(Thread) shutdown hook} that disables logging and this subclass + * nullifies that shutdown hook by disabling any reset of the LogManager by default. + * + * @author John Sirois + */ +public class UnresettableLogManager extends LogManager { + + /** + * The system property that controls which LogManager the java.util.logging subsystem should load. + */ + public static final String LOGGING_MANAGER = "java.util.logging.manager"; + + /** + * A system property which can be used to control an {@code UnresettableLogManager}'s behavior. + * If the UnresettableLogManager is installed, but an application still wants + * {@link LogManager#reset()} behavior, they can set this property to "false". + */ + private static final String LOGGING_MANAGER_IGNORERESET = "java.util.logging.manager.ignorereset"; + + @Override + public void reset() throws SecurityException { + if (Boolean.parseBoolean(System.getProperty(LOGGING_MANAGER_IGNORERESET, "true"))) { + System.err.println("UnresettableLogManager is ignoring a reset() request."); + } else { + super.reset(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java b/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java new file mode 100644 index 0000000..2756af4 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java @@ -0,0 +1,96 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.templating; + +import java.io.IOException; +import java.io.Writer; + +import com.google.common.base.Preconditions; + +import org.antlr.stringtemplate.AutoIndentWriter; +import org.antlr.stringtemplate.StringTemplate; +import org.antlr.stringtemplate.StringTemplateGroup; + +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.base.MorePreconditions; + +/** + * A class to simplify the operations required to load a stringtemplate template file from the + * classpath and populate it. + */ +public class StringTemplateHelper { + + private final StringTemplateGroup group; + private final String templatePath; + + /** + * Creates a new template helper. + * + * @param templateContextClass Classpath context for the location of the template file. + * @param templateName Template file name (excluding .st suffix) relative to + * {@code templateContextClass}. + * @param cacheTemplates Whether the template should be cached. + */ + public StringTemplateHelper( + Class<?> templateContextClass, + String templateName, + boolean cacheTemplates) { + + MorePreconditions.checkNotBlank(templateName); + String templatePath = + templateContextClass.getPackage().getName().replace('.', '/') + "/" + templateName; + StringTemplateGroup group = new StringTemplateGroup(templateName); + Preconditions.checkNotNull(group.getInstanceOf(templatePath), + "Failed to load template at: %s", templatePath); + + this.group = group; + if (!cacheTemplates) { + group.setRefreshInterval(0); + } + this.templatePath = templatePath; + } + + /** + * Thrown when an exception is encountered while populating a template. + */ + public static class TemplateException extends Exception { + public TemplateException(String msg, Throwable cause) { + super(msg, cause); + } + } + + /** + * Writes the populated template to an output writer by providing a closure with access to + * the unpopulated template object. + * + * @param out Template output writer. + * @param parameterSetter Closure to populate the template. + * @throws TemplateException If an exception was encountered while populating the template. + */ + public void writeTemplate( + Writer out, + Closure<StringTemplate> parameterSetter) throws TemplateException { + + Preconditions.checkNotNull(out); + Preconditions.checkNotNull(parameterSetter); + + StringTemplate stringTemplate = group.getInstanceOf(templatePath); + try { + parameterSetter.execute(stringTemplate); + stringTemplate.write(new AutoIndentWriter(out)); + } catch (IOException e) { + throw new TemplateException("Failed to write template: " + e, e); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java new file mode 100644 index 0000000..2ed8b15 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java @@ -0,0 +1,78 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.testing; + +import com.google.common.base.Preconditions; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.util.Clock; + +import java.util.concurrent.TimeUnit; + +/** + * A clock for use in testing with a configurable value for {@link #nowMillis()}. + * + * @author John Sirois + */ +public class FakeClock implements Clock { + // Tests may need to use the clock from multiple threads, ensure liveness. + private volatile long nowNanos; + + /** + * Sets what {@link #nowMillis()} will return until this method is called again with a new value + * for {@code now}. + * + * @param nowMillis the current time in milliseconds + */ + public void setNowMillis(long nowMillis) { + Preconditions.checkArgument(nowMillis >= 0); + this.nowNanos = TimeUnit.MILLISECONDS.toNanos(nowMillis); + } + + /** + * Advances the current time by {@code millis} milliseconds. Time can be retarded by passing a + * negative value. + * + * @param period the amount of time to advance the current time by + */ + public void advance(Amount<Long, Time> period) { + Preconditions.checkNotNull(period); + long newNanos = nowNanos + period.as(Time.NANOSECONDS); + Preconditions.checkArgument(newNanos >= 0, + "invalid period %s - would move current time to a negative value: %sns", period, newNanos); + nowNanos = newNanos; + } + + @Override + public long nowMillis() { + return TimeUnit.NANOSECONDS.toMillis(nowNanos); + } + + @Override + public long nowNanos() { + return nowNanos; + } + + /** + * Waits in fake time, immediately returning in real time; however a check of {@link #nowMillis} + * after this method completes will consistently reveal that {@code millis} did in fact pass while + * waiting. + * + * @param millis the amount of time to wait in milliseconds + */ + @Override + public void waitFor(long millis) { + advance(Amount.of(millis, Time.MILLISECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/testing/FakeTicker.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/testing/FakeTicker.java b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeTicker.java new file mode 100644 index 0000000..68247ad --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeTicker.java @@ -0,0 +1,65 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.testing; + + +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +/** + * A ticker for use in testing with a configurable value for {@link #Ticker#read()}. + */ +public class FakeTicker extends Ticker{ + private long nowNanos; + + /** + * Sets what {@link #read()} will return until this method is called again with a new value + * for {@code now}. + * + * @param nowNanos the current time in nanoseconds + */ + public void setNowNanos(long nowNanos) { + this.nowNanos = nowNanos; + } + + @Override + public long read(){ + return nowNanos; + } + + /** + * Advances the current time by the given {@code period}. Time can be retarded by passing a + * negative value. + * + * @param period the amount of time to advance the current time by + */ + public void advance(Amount<Long, Time> period) { + Preconditions.checkNotNull(period); + nowNanos = nowNanos + period.as(Time.NANOSECONDS); + } + + /** + * Waits in fake time, immediately returning in real time; however a check of {@link #Ticker#read()} + * after this method completes will consistently reveal that {@code nanos} did in fact pass while + * waiting. + * + * @param nanos the amount of time to wait in nanoseconds + */ + public void waitNanos(long nanos) { + advance(Amount.of(nanos, Time.NANOSECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java new file mode 100644 index 0000000..f679d92 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java @@ -0,0 +1,79 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; + +import org.apache.zookeeper.KeeperException; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.zookeeper.Group.WatchException; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * Interface definition for becoming or querying for a ZooKeeper-based group leader. + */ +public interface Candidate { + + /** + * Returns the current group leader by querying ZooKeeper synchronously. + * + * @return the current group leader's identifying data or {@link Optional#absent()} if there is + * no leader + * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper + * @throws KeeperException if there was a problem reading the leader information + * @throws InterruptedException if this thread is interrupted getting the leader + */ + public Optional<byte[]> getLeaderData() + throws ZooKeeperConnectionException, KeeperException, InterruptedException; + + /** + * Encapsulates a leader that can be elected and subsequently defeated. + */ + interface Leader { + + /** + * Called when this leader has been elected. + * + * @param abdicate a command that can be used to abdicate leadership and force a new election + */ + void onElected(ExceptionalCommand<JoinException> abdicate); + + /** + * Called when the leader has been ousted. Can occur either if the leader abdicates or if an + * external event causes the leader to lose its leadership role (session expiration). + */ + void onDefeated(); + } + + /** + * Offers this candidate in leadership elections for as long as the current jvm process is alive. + * Upon election, the {@code onElected} callback will be executed and a command that can be used + * to abdicate leadership will be passed in. If the elected leader jvm process dies or the + * elected leader successfully abdicates then a new leader will be elected. Leaders that + * successfully abdicate are removed from the group and will not be eligible for leadership + * election unless {@link #offerLeadership(Leader)} is called again. + * + * @param leader the leader to notify of election and defeat events + * @throws JoinException if there was a problem joining the group + * @throws WatchException if there is a problem generating the 1st group membership list + * @throws InterruptedException if interrupted waiting to join the group and determine initial + * election results + * @return a supplier that can be queried to find out if this leader is currently elected + */ + public Supplier<Boolean> offerLeadership(Leader leader) + throws JoinException, WatchException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java new file mode 100644 index 0000000..e16a64d --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java @@ -0,0 +1,181 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; + +import org.apache.zookeeper.KeeperException; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.zookeeper.Group.GroupChangeListener; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.zookeeper.Group.Membership; +import org.apache.aurora.common.zookeeper.Group.WatchException; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * Implements leader election for small groups of candidates. This implementation is subject to the + * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection"> + * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools. + */ +public class CandidateImpl implements Candidate { + private static final Logger LOG = Logger.getLogger(CandidateImpl.class.getName()); + + private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8); + + private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = new Supplier<byte[]>() { + @Override public byte[] get() { + try { + return InetAddress.getLocalHost().getHostAddress().getBytes(); + } catch (UnknownHostException e) { + LOG.log(Level.WARNING, "Failed to determine local address!", e); + return UNKNOWN_CANDIDATE_DATA; + } + } + }; + + private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE = + new Function<Iterable<String>, String>() { + @Override public String apply(Iterable<String> candidates) { + return Ordering.natural().min(candidates); + } + }; + + private final Group group; + private final Function<Iterable<String>, String> judge; + private final Supplier<byte[]> dataSupplier; + + /** + * Equivalent to {@link #CandidateImpl(Group, com.google.common.base.Function, Supplier)} using a + * judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest or + * 1st candidate and a default supplier that provides the ip address of this host according to + * {@link java.net.InetAddress#getLocalHost()} as the leader identifying data. + */ + public CandidateImpl(Group group) { + this(group, MOST_RECENT_JUDGE, IP_ADDRESS_DATA_SUPPLIER); + } + + /** + * Creates a candidate that can be used to offer leadership for the given {@code group} using + * a judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest + * or 1st. The dataSupplier should produce bytes that identify this process as leader. These bytes + * will become available to all participants via the {@link Candidate#getLeaderData()} method. + */ + public CandidateImpl(Group group, Supplier<byte[]> dataSupplier) { + this(group, MOST_RECENT_JUDGE, dataSupplier); + } + + /** + * Creates a candidate that can be used to offer leadership for the given {@code group}. The + * {@code judge} is used to pick the current leader from all group members whenever the group + * membership changes. To form a well-behaved election group with one leader, all candidates + * should use the same judge. The dataSupplier should produce bytes that identify this process + * as leader. These bytes will become available to all participants via the + * {@link Candidate#getLeaderData()} method. + */ + public CandidateImpl( + Group group, + Function<Iterable<String>, String> judge, + Supplier<byte[]> dataSupplier) { + this.group = Preconditions.checkNotNull(group); + this.judge = Preconditions.checkNotNull(judge); + this.dataSupplier = Preconditions.checkNotNull(dataSupplier); + } + + @Override + public Optional<byte[]> getLeaderData() + throws ZooKeeperConnectionException, KeeperException, InterruptedException { + + String leaderId = getLeader(group.getMemberIds()); + return leaderId == null + ? Optional.<byte[]>absent() + : Optional.of(group.getMemberData(leaderId)); + } + + @Override + public Supplier<Boolean> offerLeadership(final Leader leader) + throws JoinException, WatchException, InterruptedException { + + final Membership membership = group.join(dataSupplier, new Command() { + @Override public void execute() { + leader.onDefeated(); + } + }); + + final AtomicBoolean elected = new AtomicBoolean(false); + final AtomicBoolean abdicated = new AtomicBoolean(false); + group.watch(new GroupChangeListener() { + @Override public void onGroupChange(Iterable<String> memberIds) { + boolean noCandidates = Iterables.isEmpty(memberIds); + String memberId = membership.getMemberId(); + + if (noCandidates) { + LOG.warning("All candidates have temporarily left the group: " + group); + } else if (!Iterables.contains(memberIds, memberId)) { + LOG.severe(String.format( + "Current member ID %s is not a candidate for leader, current voting: %s", + memberId, memberIds)); + } else { + boolean electedLeader = memberId.equals(getLeader(memberIds)); + boolean previouslyElected = elected.getAndSet(electedLeader); + + if (!previouslyElected && electedLeader) { + LOG.info(String.format("Candidate %s is now leader of group: %s", + membership.getMemberPath(), memberIds)); + + leader.onElected(new ExceptionalCommand<JoinException>() { + @Override public void execute() throws JoinException { + membership.cancel(); + abdicated.set(true); + } + }); + } else if (!electedLeader) { + if (previouslyElected) { + leader.onDefeated(); + } + LOG.info(String.format( + "Candidate %s waiting for the next leader election, current voting: %s", + membership.getMemberPath(), memberIds)); + } + } + } + }); + + return new Supplier<Boolean>() { + @Override public Boolean get() { + return !abdicated.get() && elected.get(); + } + }; + } + + @Nullable + private String getLeader(Iterable<String> memberIds) { + return Iterables.isEmpty(memberIds) ? null : judge.apply(memberIds); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java new file mode 100644 index 0000000..42732db --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java @@ -0,0 +1,224 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import com.google.common.base.Joiner; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.Commands; +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; + +/** + * A ServerSet that delegates all calls to other ServerSets. + */ +public class CompoundServerSet implements ServerSet { + private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n'); + + private final List<ServerSet> serverSets; + private final Map<ServerSet, ImmutableSet<ServiceInstance>> instanceCache = Maps.newHashMap(); + private final List<HostChangeMonitor<ServiceInstance>> monitors = Lists.newArrayList(); + private Command stopWatching = null; + private ImmutableSet<ServiceInstance> allHosts = ImmutableSet.of(); + + /** + * Create new ServerSet from a list of serverSets. + * + * @param serverSets serverSets to which the calls will be delegated. + */ + public CompoundServerSet(Iterable<ServerSet> serverSets) { + MorePreconditions.checkNotBlank(serverSets); + this.serverSets = ImmutableList.copyOf(serverSets); + } + + private interface JoinOp { + EndpointStatus doJoin(ServerSet serverSet) throws JoinException, InterruptedException; + } + + private interface StatusOp { + void changeStatus(EndpointStatus status) throws UpdateException; + } + + private void changeStatus( + ImmutableList<EndpointStatus> statuses, + StatusOp statusOp) throws UpdateException { + + ImmutableList.Builder<String> builder = ImmutableList.builder(); + int errorIdx = 1; + for (EndpointStatus endpointStatus : statuses) { + try { + statusOp.changeStatus(endpointStatus); + } catch (UpdateException exception) { + builder.add(String.format("[%d] %s", errorIdx++, + Throwables.getStackTraceAsString(exception))); + } + } + if (errorIdx > 1) { + throw new UpdateException( + "One or more ServerSet update failed: " + STACK_TRACE_JOINER.join(builder.build())); + } + } + + private EndpointStatus doJoin(JoinOp joiner) throws JoinException, InterruptedException { + // Get the list of endpoint status from the serverSets. + ImmutableList.Builder<EndpointStatus> builder = ImmutableList.builder(); + for (ServerSet serverSet : serverSets) { + builder.add(joiner.doJoin(serverSet)); + } + + final ImmutableList<EndpointStatus> statuses = builder.build(); + + return new EndpointStatus() { + @Override public void leave() throws UpdateException { + changeStatus(statuses, new StatusOp() { + @Override public void changeStatus(EndpointStatus status) throws UpdateException { + status.leave(); + } + }); + } + + @Override public void update(final Status newStatus) throws UpdateException { + changeStatus(statuses, new StatusOp() { + @Override public void changeStatus(EndpointStatus status) throws UpdateException { + status.update(newStatus); + } + }); + } + }; + } + + @Override + public EndpointStatus join( + final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints) + throws Group.JoinException, InterruptedException { + + return doJoin(new JoinOp() { + @Override public EndpointStatus doJoin(ServerSet serverSet) + throws JoinException, InterruptedException { + return serverSet.join(endpoint, additionalEndpoints); + } + }); + } + + /* + * If any one of the serverSet throws an exception during respective join, the exception is + * propagated. Join is successful only if all the joins are successful. + * + * NOTE: If an exception occurs during the join, the serverSets in the composite can be in a + * partially joined state. + * + * @see ServerSet#join(InetSocketAddress, Map, Status) + */ + @Override + public EndpointStatus join( + final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints, + final Status status) throws Group.JoinException, InterruptedException { + + return doJoin(new JoinOp() { + @Override public EndpointStatus doJoin(ServerSet serverSet) + throws JoinException, InterruptedException { + + return serverSet.join(endpoint, additionalEndpoints, status); + } + }); + } + + @Override + public EndpointStatus join( + final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints, + final int shardId) throws JoinException, InterruptedException { + + return doJoin(new JoinOp() { + @Override public EndpointStatus doJoin(ServerSet serverSet) + throws JoinException, InterruptedException { + + return serverSet.join(endpoint, additionalEndpoints, shardId); + } + }); + } + + // Handles changes to the union of hosts. + private synchronized void handleChange(ServerSet serverSet, ImmutableSet<ServiceInstance> hosts) { + instanceCache.put(serverSet, hosts); + + // Get the union of hosts. + ImmutableSet<ServiceInstance> currentHosts = + ImmutableSet.copyOf(Iterables.concat(instanceCache.values())); + + // Check if the hosts have changed. + if (!currentHosts.equals(allHosts)) { + allHosts = currentHosts; + + // Notify the monitors. + for (HostChangeMonitor<ServiceInstance> monitor : monitors) { + monitor.onChange(allHosts); + } + } + } + + /** + * Monitor the CompoundServerSet. + * + * If any one of the monitor calls to the underlying serverSet raises a MonitorException, the + * exception is propagated. The call is successful only if all the monitor calls to the + * underlying serverSets are successful. + * + * NOTE: If an exception occurs during the monitor call, the serverSets in the composite will not + * be monitored. + * + * @param monitor HostChangeMonitor instance used to monitor host changes. + * @return A command that, when executed, will stop monitoring all underlying server sets. + * @throws MonitorException If there was a problem monitoring any of the underlying server sets. + */ + @Override + public synchronized Command watch(HostChangeMonitor<ServiceInstance> monitor) + throws MonitorException { + if (stopWatching == null) { + monitors.add(monitor); + ImmutableList.Builder<Command> commandsBuilder = ImmutableList.builder(); + + for (final ServerSet serverSet : serverSets) { + commandsBuilder.add(serverSet.watch(new HostChangeMonitor<ServiceInstance>() { + @Override public void onChange(ImmutableSet<ServiceInstance> hostSet) { + handleChange(serverSet, hostSet); + } + })); + } + + stopWatching = Commands.compound(commandsBuilder.build()); + } + + return stopWatching; + } + + @Override + public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + watch(monitor); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java new file mode 100644 index 0000000..1e8fc48 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java @@ -0,0 +1,39 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.util.concurrent.TimeUnit; + +/** + * DistributedLock + * + * @author Florian Leibert + */ +public interface DistributedLock { + void lock() throws LockingException; + + boolean tryLock(long timeout, TimeUnit unit); + + void unlock() throws LockingException; + + public static class LockingException extends RuntimeException { + public LockingException(String msg, Exception e) { + super(msg, e); + } + + public LockingException(String msg) { + super(msg); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java new file mode 100644 index 0000000..99a5774 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java @@ -0,0 +1,286 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.concurrent.ThreadSafe; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import org.apache.aurora.common.base.MorePreconditions; + +/** + * Distributed locking via ZooKeeper. Assuming there are N clients that all try to acquire a lock, + * the algorithm works as follows. Each host creates an ephemeral|sequential node, and requests a + * list of children for the lock node. Due to the nature of sequential, all the ids are increasing + * in order, therefore the client with the least ID according to natural ordering will hold the + * lock. Every other client watches the id immediately preceding its own id and checks for the lock + * in case of notification. The client holding the lock does the work and finally deletes the node, + * thereby triggering the next client in line to acquire the lock. Deadlocks are possible but + * avoided in most cases because if a client drops dead while holding the lock, the ZK session + * should timeout and since the node is ephemeral, it will be removed in such a case. Deadlocks + * could occur if the the worker thread on a client hangs but the zk-client thread is still alive. + * There could be an external monitor client that ensures that alerts are triggered if the least-id + * ephemeral node is present past a time-out. + * <p/> + * Note: Locking attempts will fail in case session expires! + * + * @author Florian Leibert + */ +@ThreadSafe +public class DistributedLockImpl implements DistributedLock { + + private static final Logger LOG = Logger.getLogger(DistributedLockImpl.class.getName()); + + private final ZooKeeperClient zkClient; + private final String lockPath; + private final ImmutableList<ACL> acl; + + private final AtomicBoolean aborted = new AtomicBoolean(false); + private CountDownLatch syncPoint; + private boolean holdsLock = false; + private String currentId; + private String currentNode; + private String watchedNode; + private LockWatcher watcher; + + /** + * Equivalent to {@link #DistributedLockImpl(ZooKeeperClient, String, Iterable)} with a default + * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}). + */ + public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath) { + this(zkClient, lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + /** + * Creates a distributed lock using the given {@code zkClient} to coordinate locking. + * + * @param zkClient The ZooKeeper client to use. + * @param lockPath The path used to manage the lock under. + * @param acl The acl to apply to newly created lock nodes. + */ + public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath, Iterable<ACL> acl) { + this.zkClient = Preconditions.checkNotNull(zkClient); + this.lockPath = MorePreconditions.checkNotBlank(lockPath); + this.acl = ImmutableList.copyOf(acl); + this.syncPoint = new CountDownLatch(1); + } + + private synchronized void prepare() + throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException, KeeperException { + + ZooKeeperUtils.ensurePath(zkClient, acl, lockPath); + LOG.log(Level.FINE, "Working with locking path:" + lockPath); + + // Create an EPHEMERAL_SEQUENTIAL node. + currentNode = + zkClient.get().create(lockPath + "/member_", null, acl, CreateMode.EPHEMERAL_SEQUENTIAL); + + // We only care about our actual id since we want to compare ourselves to siblings. + if (currentNode.contains("/")) { + currentId = currentNode.substring(currentNode.lastIndexOf("/") + 1); + } + LOG.log(Level.FINE, "Received ID from zk:" + currentId); + this.watcher = new LockWatcher(); + } + + @Override + public synchronized void lock() throws LockingException { + if (holdsLock) { + throw new LockingException("Error, already holding a lock. Call unlock first!"); + } + try { + prepare(); + watcher.checkForLock(); + syncPoint.await(); + if (!holdsLock) { + throw new LockingException("Error, couldn't acquire the lock!"); + } + } catch (InterruptedException e) { + cancelAttempt(); + throw new LockingException("InterruptedException while trying to acquire lock!", e); + } catch (KeeperException e) { + // No need to clean up since the node wasn't created yet. + throw new LockingException("KeeperException while trying to acquire lock!", e); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + // No need to clean up since the node wasn't created yet. + throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e); + } + } + + @Override + public synchronized boolean tryLock(long timeout, TimeUnit unit) { + if (holdsLock) { + throw new LockingException("Error, already holding a lock. Call unlock first!"); + } + try { + prepare(); + watcher.checkForLock(); + boolean success = syncPoint.await(timeout, unit); + if (!success) { + return false; + } + if (!holdsLock) { + throw new LockingException("Error, couldn't acquire the lock!"); + } + } catch (InterruptedException e) { + cancelAttempt(); + return false; + } catch (KeeperException e) { + // No need to clean up since the node wasn't created yet. + throw new LockingException("KeeperException while trying to acquire lock!", e); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + // No need to clean up since the node wasn't created yet. + throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e); + } + return true; + } + + @Override + public synchronized void unlock() throws LockingException { + if (currentId == null) { + throw new LockingException("Error, neither attempting to lock nor holding a lock!"); + } + Preconditions.checkNotNull(currentId); + // Try aborting! + if (!holdsLock) { + aborted.set(true); + LOG.log(Level.INFO, "Not holding lock, aborting acquisition attempt!"); + } else { + LOG.log(Level.INFO, "Cleaning up this locks ephemeral node."); + cleanup(); + } + } + + //TODO(Florian Leibert): Make sure this isn't a runtime exception. Put exceptions into the token? + + private synchronized void cancelAttempt() { + LOG.log(Level.INFO, "Cancelling lock attempt!"); + cleanup(); + // Bubble up failure... + holdsLock = false; + syncPoint.countDown(); + } + + private void cleanup() { + LOG.info("Cleaning up!"); + Preconditions.checkNotNull(currentId); + try { + Stat stat = zkClient.get().exists(currentNode, false); + if (stat != null) { + zkClient.get().delete(currentNode, ZooKeeperUtils.ANY_VERSION); + } else { + LOG.log(Level.WARNING, "Called cleanup but nothing to cleanup!"); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + holdsLock = false; + aborted.set(false); + currentId = null; + currentNode = null; + watcher = null; + syncPoint = new CountDownLatch(1); + } + + class LockWatcher implements Watcher { + + public synchronized void checkForLock() { + MorePreconditions.checkNotBlank(currentId); + + try { + List<String> candidates = zkClient.get().getChildren(lockPath, null); + ImmutableList<String> sortedMembers = Ordering.natural().immutableSortedCopy(candidates); + + // Unexpected behavior if there are no children! + if (sortedMembers.isEmpty()) { + throw new LockingException("Error, member list is empty!"); + } + + int memberIndex = sortedMembers.indexOf(currentId); + + // If we hold the lock + if (memberIndex == 0) { + holdsLock = true; + syncPoint.countDown(); + } else { + final String nextLowestNode = sortedMembers.get(memberIndex - 1); + LOG.log(Level.INFO, String.format("Current LockWatcher with ephemeral node [%s], is " + + "waiting for [%s] to release lock.", currentId, nextLowestNode)); + + watchedNode = String.format("%s/%s", lockPath, nextLowestNode); + Stat stat = zkClient.get().exists(watchedNode, this); + if (stat == null) { + checkForLock(); + } + } + } catch (InterruptedException e) { + LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + + "got interrupted. Trying to cancel lock acquisition.", currentId), e); + cancelAttempt(); + } catch (KeeperException e) { + LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + + "got a KeeperException. Trying to cancel lock acquisition.", currentId), e); + cancelAttempt(); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + + "got a ConnectionException. Trying to cancel lock acquisition.", currentId), e); + cancelAttempt(); + } + } + + @Override + public synchronized void process(WatchedEvent event) { + // this handles the case where we have aborted a lock and deleted ourselves but still have a + // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub. + if (!event.getPath().equals(watchedNode)) { + LOG.log(Level.INFO, "Ignoring call for node:" + watchedNode); + return; + } + //TODO(Florian Leibert): Pull this into the outer class. + if (event.getType() == Watcher.Event.EventType.None) { + switch (event.getState()) { + case SyncConnected: + // TODO(Florian Leibert): maybe we should just try to "fail-fast" in this case and abort. + LOG.info("Reconnected..."); + break; + case Expired: + LOG.log(Level.WARNING, String.format("Current ZK session expired![%s]", currentId)); + cancelAttempt(); + break; + } + } else if (event.getType() == Event.EventType.NodeDeleted) { + checkForLock(); + } else { + LOG.log(Level.WARNING, String.format("Unexpected ZK event: %s", event.getType().name())); + } + } + } +}
