http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java b/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java deleted file mode 100644 index cfea299..0000000 --- a/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util.concurrent; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -import com.google.common.base.Preconditions; - -import com.twitter.common.base.Command; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; - -/** - * An implementation of the graceful shutdown sequence recommended by {@link ExecutorService}. - * - * @author John Sirois - */ -public class ExecutorServiceShutdown implements Command { - private static final Logger LOG = Logger.getLogger(ExecutorServiceShutdown.class.getName()); - - private final ExecutorService executor; - private final Amount<Long, Time> gracePeriod; - - /** - * Creates a new {@code ExecutorServiceShutdown} command that will try to gracefully shut down the - * given {@code executor} when executed. If the supplied grace period is less than or equal to - * zero the executor service will be asked to shut down but no waiting will be done after these - * requests. - * - * @param executor The executor service this command should shut down when executed. - * @param gracePeriod The maximum time to wait after a shutdown request before continuing to the - * next shutdown phase. - */ - public ExecutorServiceShutdown(ExecutorService executor, Amount<Long, Time> gracePeriod) { - this.executor = Preconditions.checkNotNull(executor); - this.gracePeriod = Preconditions.checkNotNull(gracePeriod); - } - - @Override - public void execute() { - executor.shutdown(); // Disable new tasks from being submitted. - try { - // Wait a while for existing tasks to terminate. - if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); // Cancel currently executing tasks. - // Wait a while for tasks to respond to being cancelled. - if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { - LOG.warning("Pool did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted. - executor.shutdownNow(); - // Preserve interrupt status. - Thread.currentThread().interrupt(); - } - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java b/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java deleted file mode 100644 index 91a403e..0000000 --- a/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util.concurrent; - -import com.google.common.base.Preconditions; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * An executor service that forwards all calls to another executor service. Subclasses should - * override one or more methods to modify the behavior of the backing executor service as desired - * per the <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>. - * - * @author John Sirois - */ -public class ForwardingExecutorService<T extends ExecutorService> implements ExecutorService { - protected final T delegate; - - public ForwardingExecutorService(T delegate) { - Preconditions.checkNotNull(delegate); - this.delegate = delegate; - } - - public void shutdown() { - delegate.shutdown(); - } - - public List<Runnable> shutdownNow() { - return delegate.shutdownNow(); - } - - public boolean isShutdown() { - return delegate.isShutdown(); - } - - public boolean isTerminated() { - return delegate.isTerminated(); - } - - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return delegate.awaitTermination(timeout, unit); - } - - public <T> Future<T> submit(Callable<T> task) { - return delegate.submit(task); - } - - public <T> Future<T> submit(Runnable task, T result) { - return delegate.submit(task, result); - } - - public Future<?> submit(Runnable task) { - return delegate.submit(task); - } - - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - throws InterruptedException { - - return delegate.invokeAll(tasks); - } - - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, - TimeUnit unit) throws InterruptedException { - - return delegate.invokeAll(tasks, timeout, unit); - } - - public <T> T invokeAny(Collection<? extends Callable<T>> tasks) - throws InterruptedException, ExecutionException { - - return delegate.invokeAny(tasks); - } - - public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - - return delegate.invokeAny(tasks, timeout, unit); - } - - public void execute(Runnable command) { - delegate.execute(command); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java b/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java deleted file mode 100644 index 70a4a13..0000000 --- a/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util.concurrent; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; - -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; - -/** - * Utility class that provides factory functions to decorate - * {@link java.util.concurrent.ExecutorService}s. - */ -public final class MoreExecutors { - private MoreExecutors() { - // utility - } - - /** - * Returns a {@link ExecutorService} that passes uncaught exceptions to - * {@link java.lang.Thread.UncaughtExceptionHandler}. - * <p> - * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and - * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of - * unchecked exceptions thrown from submitted work. Some users are surprised to find that - * even the default uncaught exception handler is not invoked. - * - * @param executorService delegate - * @param uncaughtExceptionHandler exception handler that will receive exceptions generated - * from executor tasks. - * @return a decorated executor service - */ - public static ExecutorService exceptionHandlingExecutor( - ExecutorService executorService, - Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { - - Preconditions.checkNotNull(uncaughtExceptionHandler); - return new ExceptionHandlingExecutorService( - executorService, Suppliers.ofInstance(uncaughtExceptionHandler)); - } - - /** - * Returns a {@link ExecutorService} that passes uncaught exceptions to - * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler() - * at the time the exception is thrown. - * - * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ExecutorService, - * Thread.UncaughtExceptionHandler) - * @param executorService delegate - * @return a decorated executor service - */ - public static ExecutorService exceptionHandlingExecutor(ExecutorService executorService) { - return new ExceptionHandlingExecutorService( - executorService, - new Supplier<Thread.UncaughtExceptionHandler>() { - @Override - public Thread.UncaughtExceptionHandler get() { - return Thread.currentThread().getUncaughtExceptionHandler(); - } - }); - } - - /** - * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to - * {@link java.lang.Thread.UncaughtExceptionHandler}. - * <p> - * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and - * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of - * unchecked exceptions thrown from submitted work. Some users are surprised to find that - * even the default uncaught exception handler is not invoked. - * - * @param executorService delegate - * @param uncaughtExceptionHandler exception handler that will receive exceptions generated - * from executor tasks. - * @return a decorated executor service - */ - public static ScheduledExecutorService exceptionHandlingExecutor( - ScheduledExecutorService executorService, - Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { - - Preconditions.checkNotNull(uncaughtExceptionHandler); - return new ExceptionHandlingScheduledExecutorService( - executorService, - Suppliers.ofInstance(uncaughtExceptionHandler)); - } - - /** - * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to - * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler() - * at the time the exception is thrown. - * - * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ScheduledExecutorService, - * Thread.UncaughtExceptionHandler) - * @param executorService delegate - * @return a decorated executor service - */ - public static ScheduledExecutorService exceptionHandlingExecutor( - ScheduledExecutorService executorService) { - - return new ExceptionHandlingScheduledExecutorService( - executorService, - new Supplier<Thread.UncaughtExceptionHandler>() { - @Override - public Thread.UncaughtExceptionHandler get() { - return Thread.currentThread().getUncaughtExceptionHandler(); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java b/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java deleted file mode 100644 index cca7001..0000000 --- a/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util.concurrent; - -import com.google.common.base.Preconditions; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.FutureTask; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * A future task that supports retries by resubmitting itself to an {@link ExecutorService}. - * - * @author William Farner - */ -public class RetryingFutureTask extends FutureTask<Boolean> { - private static Logger LOG = Logger.getLogger(RetryingFutureTask.class.getName()); - - protected final ExecutorService executor; - protected final int maxRetries; - protected int numRetries = 0; - protected final Callable<Boolean> callable; - - /** - * Creates a new retrying future task that will execute a unit of work until successfully - * completed, or the retry limit has been reached. - * - * @param executor The executor service to resubmit the task to upon failure. - * @param callable The unit of work. The work is considered successful when {@code true} is - * returned. It may return {@code false} or throw an exception when unsueccessful. - * @param maxRetries The maximum number of times to retry the task. - */ - public RetryingFutureTask(ExecutorService executor, Callable<Boolean> callable, int maxRetries) { - super(callable); - this.callable = Preconditions.checkNotNull(callable); - this.executor = Preconditions.checkNotNull(executor); - this.maxRetries = maxRetries; - } - - /** - * Invokes a retry of this task. - */ - protected void retry() { - executor.execute(this); - } - - @Override - public void run() { - boolean success = false; - try { - success = callable.call(); - } catch (Exception e) { - LOG.log(Level.WARNING, "Exception while executing task.", e); - } - - if (!success) { - numRetries++; - if (numRetries > maxRetries) { - LOG.severe("Task did not complete after " + maxRetries + " retries, giving up."); - } else { - LOG.info("Task was not successful, resubmitting (num retries: " + numRetries + ")"); - retry(); - } - } else { - set(true); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java b/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java deleted file mode 100644 index 5535662..0000000 --- a/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util.concurrent; - -import java.util.Collection; -import java.util.concurrent.Callable; - -import com.google.common.base.Function; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.google.common.collect.Collections2; - -final class TaskConverter { - private TaskConverter() { - // utility - } - - /** - * Returns a wrapped {@link Runnable} that passes uncaught exceptions thrown from the - * original Runnable to {@link Thread.UncaughtExceptionHandler}. - * - * @param runnable runnable to be wrapped - * @param handler exception handler that will receive exceptions generated in the runnable - * @return wrapped runnable - */ - static Runnable alertingRunnable( - final Runnable runnable, - final Supplier<Thread.UncaughtExceptionHandler> handler) { - - return new Runnable() { - @Override - public void run() { - try { - runnable.run(); - } catch (Throwable t) { - handler.get().uncaughtException(Thread.currentThread(), t); - throw Throwables.propagate(t); - } - } - }; - } - - /** - * Returns a wrapped {@link java.util.concurrent.Callable} that passes uncaught exceptions - * thrown from the original Callable to {@link Thread.UncaughtExceptionHandler}. - * - * @param callable callable to be wrapped - * @param handler exception handler that will receive exceptions generated in the callable - * @return wrapped callable - */ - static <V> Callable<V> alertingCallable( - final Callable<V> callable, - final Supplier<Thread.UncaughtExceptionHandler> handler) { - - return new Callable<V>() { - @Override - public V call() throws Exception { - try { - return callable.call(); - } catch (Throwable t) { - handler.get().uncaughtException(Thread.currentThread(), t); - throw Throwables.propagate(t); - } - } - }; - } - - /* - * Calls #alertingCallable on a collection of callables - */ - static <V> Collection<? extends Callable<V>> alertingCallables( - Collection<? extends Callable<V>> callables, - final Supplier<Thread.UncaughtExceptionHandler> handler) { - - return Collections2.transform(callables, new Function<Callable<V>, Callable<V>>() { - @Override - public Callable<V> apply(Callable<V> callable) { - return alertingCallable(callable, handler); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java b/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java deleted file mode 100644 index 2cc1692..0000000 --- a/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util.logging; - -import com.google.common.base.Preconditions; - -import java.io.IOException; -import java.io.InputStream; -import java.util.logging.LogManager; - -/** - * A custom java.util.logging configuration class that loads the logging configuration from a - * properties file resource (as opposed to a file as natively supported by LogManager via - * java.util.logging.config.file). By default this configurator will look for the resource at - * /logging.properties but the resource path can be overridden by setting the system property with - * key {@link #LOGGING_PROPERTIES_RESOURCE_PATH java.util.logging.config.resource}. To install this - * configurator you must specify the following system property: - * java.util.logging.config.class=com.twitter.common.util.logging.ResourceLoggingConfigurator - * - * @author John Sirois - */ -public class ResourceLoggingConfigurator { - - /** - * A system property that controls where ResourceLoggingConfigurator looks for the logging - * configuration on the process classpath. - */ - public static final String LOGGING_PROPERTIES_RESOURCE_PATH = "java.util.logging.config.resource"; - - public ResourceLoggingConfigurator() throws IOException { - String loggingPropertiesResourcePath = - System.getProperty(LOGGING_PROPERTIES_RESOURCE_PATH, "/logging.properties"); - InputStream loggingConfig = getClass().getResourceAsStream(loggingPropertiesResourcePath); - Preconditions.checkNotNull(loggingConfig, - "Could not locate logging config file at resource path: %s", loggingPropertiesResourcePath); - LogManager.getLogManager().readConfiguration(loggingConfig); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java b/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java deleted file mode 100644 index aa57572..0000000 --- a/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util.logging; - -import java.util.logging.LogManager; - -/** - * A LogManager which by default ignores calls to {@link #reset()}. This is useful to avoid missing - * log statements that occur during vm shutdown. The standard LogManager installs a - * {@link Runtime#addShutdownHook(Thread) shutdown hook} that disables logging and this subclass - * nullifies that shutdown hook by disabling any reset of the LogManager by default. - * - * @author John Sirois - */ -public class UnresettableLogManager extends LogManager { - - /** - * The system property that controls which LogManager the java.util.logging subsystem should load. - */ - public static final String LOGGING_MANAGER = "java.util.logging.manager"; - - /** - * A system property which can be used to control an {@code UnresettableLogManager}'s behavior. - * If the UnresettableLogManager is installed, but an application still wants - * {@link LogManager#reset()} behavior, they can set this property to "false". - */ - private static final String LOGGING_MANAGER_IGNORERESET = "java.util.logging.manager.ignorereset"; - - @Override - public void reset() throws SecurityException { - if (Boolean.parseBoolean(System.getProperty(LOGGING_MANAGER_IGNORERESET, "true"))) { - System.err.println("UnresettableLogManager is ignoring a reset() request."); - } else { - super.reset(); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java b/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java deleted file mode 100644 index 52c80ae..0000000 --- a/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util.templating; - -import java.io.IOException; -import java.io.Writer; - -import com.google.common.base.Preconditions; - -import org.antlr.stringtemplate.AutoIndentWriter; -import org.antlr.stringtemplate.StringTemplate; -import org.antlr.stringtemplate.StringTemplateGroup; - -import com.twitter.common.base.Closure; -import com.twitter.common.base.MorePreconditions; - -/** - * A class to simplify the operations required to load a stringtemplate template file from the - * classpath and populate it. - */ -public class StringTemplateHelper { - - private final StringTemplateGroup group; - private final String templatePath; - - /** - * Creates a new template helper. - * - * @param templateContextClass Classpath context for the location of the template file. - * @param templateName Template file name (excluding .st suffix) relative to - * {@code templateContextClass}. - * @param cacheTemplates Whether the template should be cached. - */ - public StringTemplateHelper( - Class<?> templateContextClass, - String templateName, - boolean cacheTemplates) { - - MorePreconditions.checkNotBlank(templateName); - String templatePath = - templateContextClass.getPackage().getName().replace('.', '/') + "/" + templateName; - StringTemplateGroup group = new StringTemplateGroup(templateName); - Preconditions.checkNotNull(group.getInstanceOf(templatePath), - "Failed to load template at: %s", templatePath); - - this.group = group; - if (!cacheTemplates) { - group.setRefreshInterval(0); - } - this.templatePath = templatePath; - } - - /** - * Thrown when an exception is encountered while populating a template. - */ - public static class TemplateException extends Exception { - public TemplateException(String msg, Throwable cause) { - super(msg, cause); - } - } - - /** - * Writes the populated template to an output writer by providing a closure with access to - * the unpopulated template object. - * - * @param out Template output writer. - * @param parameterSetter Closure to populate the template. - * @throws TemplateException If an exception was encountered while populating the template. - */ - public void writeTemplate( - Writer out, - Closure<StringTemplate> parameterSetter) throws TemplateException { - - Preconditions.checkNotNull(out); - Preconditions.checkNotNull(parameterSetter); - - StringTemplate stringTemplate = group.getInstanceOf(templatePath); - try { - parameterSetter.execute(stringTemplate); - stringTemplate.write(new AutoIndentWriter(out)); - } catch (IOException e) { - throw new TemplateException("Failed to write template: " + e, e); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java b/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java deleted file mode 100644 index 34d3bc9..0000000 --- a/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util.testing; - -import com.google.common.base.Preconditions; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.Clock; - -import java.util.concurrent.TimeUnit; - -/** - * A clock for use in testing with a configurable value for {@link #nowMillis()}. - * - * @author John Sirois - */ -public class FakeClock implements Clock { - // Tests may need to use the clock from multiple threads, ensure liveness. - private volatile long nowNanos; - - /** - * Sets what {@link #nowMillis()} will return until this method is called again with a new value - * for {@code now}. - * - * @param nowMillis the current time in milliseconds - */ - public void setNowMillis(long nowMillis) { - Preconditions.checkArgument(nowMillis >= 0); - this.nowNanos = TimeUnit.MILLISECONDS.toNanos(nowMillis); - } - - /** - * Advances the current time by {@code millis} milliseconds. Time can be retarded by passing a - * negative value. - * - * @param period the amount of time to advance the current time by - */ - public void advance(Amount<Long, Time> period) { - Preconditions.checkNotNull(period); - long newNanos = nowNanos + period.as(Time.NANOSECONDS); - Preconditions.checkArgument(newNanos >= 0, - "invalid period %s - would move current time to a negative value: %sns", period, newNanos); - nowNanos = newNanos; - } - - @Override - public long nowMillis() { - return TimeUnit.NANOSECONDS.toMillis(nowNanos); - } - - @Override - public long nowNanos() { - return nowNanos; - } - - /** - * Waits in fake time, immediately returning in real time; however a check of {@link #nowMillis} - * after this method completes will consistently reveal that {@code millis} did in fact pass while - * waiting. - * - * @param millis the amount of time to wait in milliseconds - */ - @Override - public void waitFor(long millis) { - advance(Amount.of(millis, Time.MILLISECONDS)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java b/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java deleted file mode 100644 index dfe374e..0000000 --- a/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util.testing; - - -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; - -import org.omg.CORBA.PUBLIC_MEMBER; - -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; - -/** - * A ticker for use in testing with a configurable value for {@link #Ticker#read()}. - */ -public class FakeTicker extends Ticker{ - private long nowNanos; - - /** - * Sets what {@link #read()} will return until this method is called again with a new value - * for {@code now}. - * - * @param nowNanos the current time in nanoseconds - */ - public void setNowNanos(long nowNanos) { - this.nowNanos = nowNanos; - } - - @Override - public long read(){ - return nowNanos; - } - - /** - * Advances the current time by the given {@code period}. Time can be retarded by passing a - * negative value. - * - * @param period the amount of time to advance the current time by - */ - public void advance(Amount<Long, Time> period) { - Preconditions.checkNotNull(period); - nowNanos = nowNanos + period.as(Time.NANOSECONDS); - } - - /** - * Waits in fake time, immediately returning in real time; however a check of {@link #Ticker#read()} - * after this method completes will consistently reveal that {@code nanos} did in fact pass while - * waiting. - * - * @param nanos the amount of time to wait in nanoseconds - */ - public void waitNanos(long nanos) { - advance(Amount.of(nanos, Time.NANOSECONDS)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java b/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java deleted file mode 100644 index 1ee0e40..0000000 --- a/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.webassets.bootstrap; - -import com.google.common.io.Resources; -import com.google.common.net.MediaType; -import com.google.inject.AbstractModule; - -import com.twitter.common.application.http.Registration; - -/** - * A binding module to register bootstrap HTTP assets. - */ -public final class BootstrapModule extends AbstractModule { - /** - * Enum for available Bootstrap versions to choose from. - */ - public enum BootstrapVersion { - VERSION_2_1_1 ("2.1.1"), - VERSION_2_3_2 ("2.3.2"); - - private final String version; - - BootstrapVersion(String s) { - version = s; - } - } - - private final String version; - - /** - * Default constructor. - */ - public BootstrapModule() { - this(BootstrapVersion.VERSION_2_1_1); - } - - /** - * BootstrapModule Constructor. - * - * @param version supplies the bootstrap version to select. - */ - public BootstrapModule(BootstrapVersion version) { - this.version = version.version; - } - - private void register(String mountPath, String resourcePath, String contentType) { - Registration.registerHttpAsset( - binder(), - "/" + mountPath, - Resources.getResource(BootstrapModule.class, resourcePath), - contentType, - true); - } - - @Override - protected void configure() { - register( - "css/bootstrap-responsive.min.css", - version + "/css/bootstrap-responsive.min.css", - MediaType.CSS_UTF_8.toString()); - register( - "css/bootstrap.min.css", - version + "/css/bootstrap.min.css", - MediaType.CSS_UTF_8.toString()); - register( - "img/glyphicons-halflings-white.png", - version + "/img/glyphicons-halflings-white.png", - MediaType.PNG.toString()); - register( - "img/glyphicons-halflings.png", - version + "/img/glyphicons-halflings.png", - MediaType.PNG.toString()); - register( - "js/bootstrap.min.js", - version + "/js/bootstrap.min.js", - MediaType.JAVASCRIPT_UTF_8.toString()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java b/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java deleted file mode 100644 index 5c5e65d..0000000 --- a/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.webassets.jquery; - -import com.google.common.io.Resources; -import com.google.common.net.MediaType; -import com.google.inject.AbstractModule; - -import com.twitter.common.application.http.Registration; - -/** - * A binding module to register jQuery HTTP assets. - */ -public final class JQueryModule extends AbstractModule { - - @Override - protected void configure() { - Registration.registerHttpAsset( - binder(), - "/js/jquery.min.js", - Resources.getResource(JQueryModule.class, "js/jquery-1.8.2.min.js"), - MediaType.JAVASCRIPT_UTF_8.toString(), - true); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java b/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java deleted file mode 100644 index 3945700..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import com.google.common.base.Optional; -import com.google.common.base.Supplier; - -import org.apache.zookeeper.KeeperException; - -import com.twitter.common.base.ExceptionalCommand; -import com.twitter.common.zookeeper.Group.JoinException; -import com.twitter.common.zookeeper.Group.WatchException; -import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; - -/** - * Interface definition for becoming or querying for a ZooKeeper-based group leader. - */ -public interface Candidate { - - /** - * Returns the current group leader by querying ZooKeeper synchronously. - * - * @return the current group leader's identifying data or {@link Optional#absent()} if there is - * no leader - * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper - * @throws KeeperException if there was a problem reading the leader information - * @throws InterruptedException if this thread is interrupted getting the leader - */ - public Optional<byte[]> getLeaderData() - throws ZooKeeperConnectionException, KeeperException, InterruptedException; - - /** - * Encapsulates a leader that can be elected and subsequently defeated. - */ - interface Leader { - - /** - * Called when this leader has been elected. - * - * @param abdicate a command that can be used to abdicate leadership and force a new election - */ - void onElected(ExceptionalCommand<JoinException> abdicate); - - /** - * Called when the leader has been ousted. Can occur either if the leader abdicates or if an - * external event causes the leader to lose its leadership role (session expiration). - */ - void onDefeated(); - } - - /** - * Offers this candidate in leadership elections for as long as the current jvm process is alive. - * Upon election, the {@code onElected} callback will be executed and a command that can be used - * to abdicate leadership will be passed in. If the elected leader jvm process dies or the - * elected leader successfully abdicates then a new leader will be elected. Leaders that - * successfully abdicate are removed from the group and will not be eligible for leadership - * election unless {@link #offerLeadership(Leader)} is called again. - * - * @param leader the leader to notify of election and defeat events - * @throws JoinException if there was a problem joining the group - * @throws WatchException if there is a problem generating the 1st group membership list - * @throws InterruptedException if interrupted waiting to join the group and determine initial - * election results - * @return a supplier that can be queried to find out if this leader is currently elected - */ - public Supplier<Boolean> offerLeadership(Leader leader) - throws JoinException, WatchException, InterruptedException; -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java deleted file mode 100644 index c77945b..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.Nullable; - -import com.google.common.base.Charsets; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; - -import org.apache.zookeeper.KeeperException; - -import com.twitter.common.base.Command; -import com.twitter.common.base.ExceptionalCommand; -import com.twitter.common.zookeeper.Group.GroupChangeListener; -import com.twitter.common.zookeeper.Group.JoinException; -import com.twitter.common.zookeeper.Group.Membership; -import com.twitter.common.zookeeper.Group.WatchException; -import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; - -/** - * Implements leader election for small groups of candidates. This implementation is subject to the - * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection"> - * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools. - */ -public class CandidateImpl implements Candidate { - private static final Logger LOG = Logger.getLogger(CandidateImpl.class.getName()); - - private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8); - - private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = new Supplier<byte[]>() { - @Override public byte[] get() { - try { - return InetAddress.getLocalHost().getHostAddress().getBytes(); - } catch (UnknownHostException e) { - LOG.log(Level.WARNING, "Failed to determine local address!", e); - return UNKNOWN_CANDIDATE_DATA; - } - } - }; - - private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE = - new Function<Iterable<String>, String>() { - @Override public String apply(Iterable<String> candidates) { - return Ordering.natural().min(candidates); - } - }; - - private final Group group; - private final Function<Iterable<String>, String> judge; - private final Supplier<byte[]> dataSupplier; - - /** - * Equivalent to {@link #CandidateImpl(Group, com.google.common.base.Function, Supplier)} using a - * judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest or - * 1st candidate and a default supplier that provides the ip address of this host according to - * {@link java.net.InetAddress#getLocalHost()} as the leader identifying data. - */ - public CandidateImpl(Group group) { - this(group, MOST_RECENT_JUDGE, IP_ADDRESS_DATA_SUPPLIER); - } - - /** - * Creates a candidate that can be used to offer leadership for the given {@code group} using - * a judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest - * or 1st. The dataSupplier should produce bytes that identify this process as leader. These bytes - * will become available to all participants via the {@link Candidate#getLeaderData()} method. - */ - public CandidateImpl(Group group, Supplier<byte[]> dataSupplier) { - this(group, MOST_RECENT_JUDGE, dataSupplier); - } - - /** - * Creates a candidate that can be used to offer leadership for the given {@code group}. The - * {@code judge} is used to pick the current leader from all group members whenever the group - * membership changes. To form a well-behaved election group with one leader, all candidates - * should use the same judge. The dataSupplier should produce bytes that identify this process - * as leader. These bytes will become available to all participants via the - * {@link Candidate#getLeaderData()} method. - */ - public CandidateImpl( - Group group, - Function<Iterable<String>, String> judge, - Supplier<byte[]> dataSupplier) { - this.group = Preconditions.checkNotNull(group); - this.judge = Preconditions.checkNotNull(judge); - this.dataSupplier = Preconditions.checkNotNull(dataSupplier); - } - - @Override - public Optional<byte[]> getLeaderData() - throws ZooKeeperConnectionException, KeeperException, InterruptedException { - - String leaderId = getLeader(group.getMemberIds()); - return leaderId == null - ? Optional.<byte[]>absent() - : Optional.of(group.getMemberData(leaderId)); - } - - @Override - public Supplier<Boolean> offerLeadership(final Leader leader) - throws JoinException, WatchException, InterruptedException { - - final Membership membership = group.join(dataSupplier, new Command() { - @Override public void execute() { - leader.onDefeated(); - } - }); - - final AtomicBoolean elected = new AtomicBoolean(false); - final AtomicBoolean abdicated = new AtomicBoolean(false); - group.watch(new GroupChangeListener() { - @Override public void onGroupChange(Iterable<String> memberIds) { - boolean noCandidates = Iterables.isEmpty(memberIds); - String memberId = membership.getMemberId(); - - if (noCandidates) { - LOG.warning("All candidates have temporarily left the group: " + group); - } else if (!Iterables.contains(memberIds, memberId)) { - LOG.severe(String.format( - "Current member ID %s is not a candidate for leader, current voting: %s", - memberId, memberIds)); - } else { - boolean electedLeader = memberId.equals(getLeader(memberIds)); - boolean previouslyElected = elected.getAndSet(electedLeader); - - if (!previouslyElected && electedLeader) { - LOG.info(String.format("Candidate %s is now leader of group: %s", - membership.getMemberPath(), memberIds)); - - leader.onElected(new ExceptionalCommand<JoinException>() { - @Override public void execute() throws JoinException { - membership.cancel(); - abdicated.set(true); - } - }); - } else if (!electedLeader) { - if (previouslyElected) { - leader.onDefeated(); - } - LOG.info(String.format( - "Candidate %s waiting for the next leader election, current voting: %s", - membership.getMemberPath(), memberIds)); - } - } - } - }); - - return new Supplier<Boolean>() { - @Override public Boolean get() { - return !abdicated.get() && elected.get(); - } - }; - } - - @Nullable - private String getLeader(Iterable<String> memberIds) { - return Iterables.isEmpty(memberIds) ? null : judge.apply(memberIds); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java deleted file mode 100644 index afe3e6f..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Map; - -import com.google.common.base.Joiner; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import com.twitter.common.base.Command; -import com.twitter.common.base.Commands; -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.zookeeper.Group.JoinException; -import com.twitter.thrift.ServiceInstance; -import com.twitter.thrift.Status; - -/** - * A ServerSet that delegates all calls to other ServerSets. - */ -public class CompoundServerSet implements ServerSet { - private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n'); - - private final List<ServerSet> serverSets; - private final Map<ServerSet, ImmutableSet<ServiceInstance>> instanceCache = Maps.newHashMap(); - private final List<HostChangeMonitor<ServiceInstance>> monitors = Lists.newArrayList(); - private Command stopWatching = null; - private ImmutableSet<ServiceInstance> allHosts = ImmutableSet.of(); - - /** - * Create new ServerSet from a list of serverSets. - * - * @param serverSets serverSets to which the calls will be delegated. - */ - public CompoundServerSet(Iterable<ServerSet> serverSets) { - MorePreconditions.checkNotBlank(serverSets); - this.serverSets = ImmutableList.copyOf(serverSets); - } - - private interface JoinOp { - EndpointStatus doJoin(ServerSet serverSet) throws JoinException, InterruptedException; - } - - private interface StatusOp { - void changeStatus(EndpointStatus status) throws UpdateException; - } - - private void changeStatus( - ImmutableList<EndpointStatus> statuses, - StatusOp statusOp) throws UpdateException { - - ImmutableList.Builder<String> builder = ImmutableList.builder(); - int errorIdx = 1; - for (EndpointStatus endpointStatus : statuses) { - try { - statusOp.changeStatus(endpointStatus); - } catch (UpdateException exception) { - builder.add(String.format("[%d] %s", errorIdx++, - Throwables.getStackTraceAsString(exception))); - } - } - if (errorIdx > 1) { - throw new UpdateException( - "One or more ServerSet update failed: " + STACK_TRACE_JOINER.join(builder.build())); - } - } - - private EndpointStatus doJoin(JoinOp joiner) throws JoinException, InterruptedException { - // Get the list of endpoint status from the serverSets. - ImmutableList.Builder<EndpointStatus> builder = ImmutableList.builder(); - for (ServerSet serverSet : serverSets) { - builder.add(joiner.doJoin(serverSet)); - } - - final ImmutableList<EndpointStatus> statuses = builder.build(); - - return new EndpointStatus() { - @Override public void leave() throws UpdateException { - changeStatus(statuses, new StatusOp() { - @Override public void changeStatus(EndpointStatus status) throws UpdateException { - status.leave(); - } - }); - } - - @Override public void update(final Status newStatus) throws UpdateException { - changeStatus(statuses, new StatusOp() { - @Override public void changeStatus(EndpointStatus status) throws UpdateException { - status.update(newStatus); - } - }); - } - }; - } - - @Override - public EndpointStatus join( - final InetSocketAddress endpoint, - final Map<String, InetSocketAddress> additionalEndpoints) - throws Group.JoinException, InterruptedException { - - return doJoin(new JoinOp() { - @Override public EndpointStatus doJoin(ServerSet serverSet) - throws JoinException, InterruptedException { - return serverSet.join(endpoint, additionalEndpoints); - } - }); - } - - /* - * If any one of the serverSet throws an exception during respective join, the exception is - * propagated. Join is successful only if all the joins are successful. - * - * NOTE: If an exception occurs during the join, the serverSets in the composite can be in a - * partially joined state. - * - * @see ServerSet#join(InetSocketAddress, Map, Status) - */ - @Override - public EndpointStatus join( - final InetSocketAddress endpoint, - final Map<String, InetSocketAddress> additionalEndpoints, - final Status status) throws Group.JoinException, InterruptedException { - - return doJoin(new JoinOp() { - @Override public EndpointStatus doJoin(ServerSet serverSet) - throws JoinException, InterruptedException { - - return serverSet.join(endpoint, additionalEndpoints, status); - } - }); - } - - @Override - public EndpointStatus join( - final InetSocketAddress endpoint, - final Map<String, InetSocketAddress> additionalEndpoints, - final int shardId) throws JoinException, InterruptedException { - - return doJoin(new JoinOp() { - @Override public EndpointStatus doJoin(ServerSet serverSet) - throws JoinException, InterruptedException { - - return serverSet.join(endpoint, additionalEndpoints, shardId); - } - }); - } - - // Handles changes to the union of hosts. - private synchronized void handleChange(ServerSet serverSet, ImmutableSet<ServiceInstance> hosts) { - instanceCache.put(serverSet, hosts); - - // Get the union of hosts. - ImmutableSet<ServiceInstance> currentHosts = - ImmutableSet.copyOf(Iterables.concat(instanceCache.values())); - - // Check if the hosts have changed. - if (!currentHosts.equals(allHosts)) { - allHosts = currentHosts; - - // Notify the monitors. - for (HostChangeMonitor<ServiceInstance> monitor : monitors) { - monitor.onChange(allHosts); - } - } - } - - /** - * Monitor the CompoundServerSet. - * - * If any one of the monitor calls to the underlying serverSet raises a MonitorException, the - * exception is propagated. The call is successful only if all the monitor calls to the - * underlying serverSets are successful. - * - * NOTE: If an exception occurs during the monitor call, the serverSets in the composite will not - * be monitored. - * - * @param monitor HostChangeMonitor instance used to monitor host changes. - * @return A command that, when executed, will stop monitoring all underlying server sets. - * @throws MonitorException If there was a problem monitoring any of the underlying server sets. - */ - @Override - public synchronized Command watch(HostChangeMonitor<ServiceInstance> monitor) - throws MonitorException { - if (stopWatching == null) { - monitors.add(monitor); - ImmutableList.Builder<Command> commandsBuilder = ImmutableList.builder(); - - for (final ServerSet serverSet : serverSets) { - commandsBuilder.add(serverSet.watch(new HostChangeMonitor<ServiceInstance>() { - @Override public void onChange(ImmutableSet<ServiceInstance> hostSet) { - handleChange(serverSet, hostSet); - } - })); - } - - stopWatching = Commands.compound(commandsBuilder.build()); - } - - return stopWatching; - } - - @Override - public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { - watch(monitor); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java deleted file mode 100644 index 4ddbb90..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.util.concurrent.TimeUnit; - -/** - * DistributedLock - * - * @author Florian Leibert - */ -public interface DistributedLock { - void lock() throws LockingException; - - boolean tryLock(long timeout, TimeUnit unit); - - void unlock() throws LockingException; - - public static class LockingException extends RuntimeException { - public LockingException(String msg, Exception e) { - super(msg, e); - } - - public LockingException(String msg) { - super(msg); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java deleted file mode 100644 index 2d9ee63..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.concurrent.ThreadSafe; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Ordering; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; - -import com.twitter.common.base.MorePreconditions; - -/** - * Distributed locking via ZooKeeper. Assuming there are N clients that all try to acquire a lock, - * the algorithm works as follows. Each host creates an ephemeral|sequential node, and requests a - * list of children for the lock node. Due to the nature of sequential, all the ids are increasing - * in order, therefore the client with the least ID according to natural ordering will hold the - * lock. Every other client watches the id immediately preceding its own id and checks for the lock - * in case of notification. The client holding the lock does the work and finally deletes the node, - * thereby triggering the next client in line to acquire the lock. Deadlocks are possible but - * avoided in most cases because if a client drops dead while holding the lock, the ZK session - * should timeout and since the node is ephemeral, it will be removed in such a case. Deadlocks - * could occur if the the worker thread on a client hangs but the zk-client thread is still alive. - * There could be an external monitor client that ensures that alerts are triggered if the least-id - * ephemeral node is present past a time-out. - * <p/> - * Note: Locking attempts will fail in case session expires! - * - * @author Florian Leibert - */ -@ThreadSafe -public class DistributedLockImpl implements DistributedLock { - - private static final Logger LOG = Logger.getLogger(DistributedLockImpl.class.getName()); - - private final ZooKeeperClient zkClient; - private final String lockPath; - private final ImmutableList<ACL> acl; - - private final AtomicBoolean aborted = new AtomicBoolean(false); - private CountDownLatch syncPoint; - private boolean holdsLock = false; - private String currentId; - private String currentNode; - private String watchedNode; - private LockWatcher watcher; - - /** - * Equivalent to {@link #DistributedLockImpl(ZooKeeperClient, String, Iterable)} with a default - * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}). - */ - public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath) { - this(zkClient, lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE); - } - - /** - * Creates a distributed lock using the given {@code zkClient} to coordinate locking. - * - * @param zkClient The ZooKeeper client to use. - * @param lockPath The path used to manage the lock under. - * @param acl The acl to apply to newly created lock nodes. - */ - public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath, Iterable<ACL> acl) { - this.zkClient = Preconditions.checkNotNull(zkClient); - this.lockPath = MorePreconditions.checkNotBlank(lockPath); - this.acl = ImmutableList.copyOf(acl); - this.syncPoint = new CountDownLatch(1); - } - - private synchronized void prepare() - throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException, KeeperException { - - ZooKeeperUtils.ensurePath(zkClient, acl, lockPath); - LOG.log(Level.FINE, "Working with locking path:" + lockPath); - - // Create an EPHEMERAL_SEQUENTIAL node. - currentNode = - zkClient.get().create(lockPath + "/member_", null, acl, CreateMode.EPHEMERAL_SEQUENTIAL); - - // We only care about our actual id since we want to compare ourselves to siblings. - if (currentNode.contains("/")) { - currentId = currentNode.substring(currentNode.lastIndexOf("/") + 1); - } - LOG.log(Level.FINE, "Received ID from zk:" + currentId); - this.watcher = new LockWatcher(); - } - - @Override - public synchronized void lock() throws LockingException { - if (holdsLock) { - throw new LockingException("Error, already holding a lock. Call unlock first!"); - } - try { - prepare(); - watcher.checkForLock(); - syncPoint.await(); - if (!holdsLock) { - throw new LockingException("Error, couldn't acquire the lock!"); - } - } catch (InterruptedException e) { - cancelAttempt(); - throw new LockingException("InterruptedException while trying to acquire lock!", e); - } catch (KeeperException e) { - // No need to clean up since the node wasn't created yet. - throw new LockingException("KeeperException while trying to acquire lock!", e); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - // No need to clean up since the node wasn't created yet. - throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e); - } - } - - @Override - public synchronized boolean tryLock(long timeout, TimeUnit unit) { - if (holdsLock) { - throw new LockingException("Error, already holding a lock. Call unlock first!"); - } - try { - prepare(); - watcher.checkForLock(); - boolean success = syncPoint.await(timeout, unit); - if (!success) { - return false; - } - if (!holdsLock) { - throw new LockingException("Error, couldn't acquire the lock!"); - } - } catch (InterruptedException e) { - cancelAttempt(); - return false; - } catch (KeeperException e) { - // No need to clean up since the node wasn't created yet. - throw new LockingException("KeeperException while trying to acquire lock!", e); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - // No need to clean up since the node wasn't created yet. - throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e); - } - return true; - } - - @Override - public synchronized void unlock() throws LockingException { - if (currentId == null) { - throw new LockingException("Error, neither attempting to lock nor holding a lock!"); - } - Preconditions.checkNotNull(currentId); - // Try aborting! - if (!holdsLock) { - aborted.set(true); - LOG.log(Level.INFO, "Not holding lock, aborting acquisition attempt!"); - } else { - LOG.log(Level.INFO, "Cleaning up this locks ephemeral node."); - cleanup(); - } - } - - //TODO(Florian Leibert): Make sure this isn't a runtime exception. Put exceptions into the token? - - private synchronized void cancelAttempt() { - LOG.log(Level.INFO, "Cancelling lock attempt!"); - cleanup(); - // Bubble up failure... - holdsLock = false; - syncPoint.countDown(); - } - - private void cleanup() { - LOG.info("Cleaning up!"); - Preconditions.checkNotNull(currentId); - try { - Stat stat = zkClient.get().exists(currentNode, false); - if (stat != null) { - zkClient.get().delete(currentNode, ZooKeeperUtils.ANY_VERSION); - } else { - LOG.log(Level.WARNING, "Called cleanup but nothing to cleanup!"); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - holdsLock = false; - aborted.set(false); - currentId = null; - currentNode = null; - watcher = null; - syncPoint = new CountDownLatch(1); - } - - class LockWatcher implements Watcher { - - public synchronized void checkForLock() { - MorePreconditions.checkNotBlank(currentId); - - try { - List<String> candidates = zkClient.get().getChildren(lockPath, null); - ImmutableList<String> sortedMembers = Ordering.natural().immutableSortedCopy(candidates); - - // Unexpected behavior if there are no children! - if (sortedMembers.isEmpty()) { - throw new LockingException("Error, member list is empty!"); - } - - int memberIndex = sortedMembers.indexOf(currentId); - - // If we hold the lock - if (memberIndex == 0) { - holdsLock = true; - syncPoint.countDown(); - } else { - final String nextLowestNode = sortedMembers.get(memberIndex - 1); - LOG.log(Level.INFO, String.format("Current LockWatcher with ephemeral node [%s], is " + - "waiting for [%s] to release lock.", currentId, nextLowestNode)); - - watchedNode = String.format("%s/%s", lockPath, nextLowestNode); - Stat stat = zkClient.get().exists(watchedNode, this); - if (stat == null) { - checkForLock(); - } - } - } catch (InterruptedException e) { - LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + - "got interrupted. Trying to cancel lock acquisition.", currentId), e); - cancelAttempt(); - } catch (KeeperException e) { - LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + - "got a KeeperException. Trying to cancel lock acquisition.", currentId), e); - cancelAttempt(); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + - "got a ConnectionException. Trying to cancel lock acquisition.", currentId), e); - cancelAttempt(); - } - } - - @Override - public synchronized void process(WatchedEvent event) { - // this handles the case where we have aborted a lock and deleted ourselves but still have a - // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub. - if (!event.getPath().equals(watchedNode)) { - LOG.log(Level.INFO, "Ignoring call for node:" + watchedNode); - return; - } - //TODO(Florian Leibert): Pull this into the outer class. - if (event.getType() == Watcher.Event.EventType.None) { - switch (event.getState()) { - case SyncConnected: - // TODO(Florian Leibert): maybe we should just try to "fail-fast" in this case and abort. - LOG.info("Reconnected..."); - break; - case Expired: - LOG.log(Level.WARNING, String.format("Current ZK session expired![%s]", currentId)); - cancelAttempt(); - break; - } - } else if (event.getType() == Event.EventType.NodeDeleted) { - checkForLock(); - } else { - LOG.log(Level.WARNING, String.format("Unexpected ZK event: %s", event.getType().name())); - } - } - } -}
