http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java b/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java new file mode 100644 index 0000000..ea1c994 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java @@ -0,0 +1,52 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util.logging; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.io.InputStream; +import java.util.logging.LogManager; + +/** + * A custom java.util.logging configuration class that loads the logging configuration from a + * properties file resource (as opposed to a file as natively supported by LogManager via + * java.util.logging.config.file). By default this configurator will look for the resource at + * /logging.properties but the resource path can be overridden by setting the system property with + * key {@link #LOGGING_PROPERTIES_RESOURCE_PATH java.util.logging.config.resource}. To install this + * configurator you must specify the following system property: + * java.util.logging.config.class=com.twitter.common.util.logging.ResourceLoggingConfigurator + * + * @author John Sirois + */ +public class ResourceLoggingConfigurator { + + /** + * A system property that controls where ResourceLoggingConfigurator looks for the logging + * configuration on the process classpath. + */ + public static final String LOGGING_PROPERTIES_RESOURCE_PATH = "java.util.logging.config.resource"; + + public ResourceLoggingConfigurator() throws IOException { + String loggingPropertiesResourcePath = + System.getProperty(LOGGING_PROPERTIES_RESOURCE_PATH, "/logging.properties"); + InputStream loggingConfig = getClass().getResourceAsStream(loggingPropertiesResourcePath); + Preconditions.checkNotNull(loggingConfig, + "Could not locate logging config file at resource path: %s", loggingPropertiesResourcePath); + LogManager.getLogManager().readConfiguration(loggingConfig); + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java b/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java new file mode 100644 index 0000000..14822ff --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java @@ -0,0 +1,51 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util.logging; + +import java.util.logging.LogManager; + +/** + * A LogManager which by default ignores calls to {@link #reset()}. This is useful to avoid missing + * log statements that occur during vm shutdown. The standard LogManager installs a + * {@link Runtime#addShutdownHook(Thread) shutdown hook} that disables logging and this subclass + * nullifies that shutdown hook by disabling any reset of the LogManager by default. + * + * @author John Sirois + */ +public class UnresettableLogManager extends LogManager { + + /** + * The system property that controls which LogManager the java.util.logging subsystem should load. + */ + public static final String LOGGING_MANAGER = "java.util.logging.manager"; + + /** + * A system property which can be used to control an {@code UnresettableLogManager}'s behavior. + * If the UnresettableLogManager is installed, but an application still wants + * {@link LogManager#reset()} behavior, they can set this property to "false". + */ + private static final String LOGGING_MANAGER_IGNORERESET = "java.util.logging.manager.ignorereset"; + + @Override + public void reset() throws SecurityException { + if (Boolean.parseBoolean(System.getProperty(LOGGING_MANAGER_IGNORERESET, "true"))) { + System.err.println("UnresettableLogManager is ignoring a reset() request."); + } else { + super.reset(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java b/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java new file mode 100644 index 0000000..f677b1a --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java @@ -0,0 +1,83 @@ +package com.twitter.common.util.templating; + +import java.io.IOException; +import java.io.Writer; + +import com.google.common.base.Preconditions; + +import org.antlr.stringtemplate.AutoIndentWriter; +import org.antlr.stringtemplate.StringTemplate; +import org.antlr.stringtemplate.StringTemplateGroup; + +import com.twitter.common.base.Closure; +import com.twitter.common.base.MorePreconditions; + +/** + * A class to simplify the operations required to load a stringtemplate template file from the + * classpath and populate it. + */ +public class StringTemplateHelper { + + private final StringTemplateGroup group; + private final String templatePath; + + /** + * Creates a new template helper. + * + * @param templateContextClass Classpath context for the location of the template file. + * @param templateName Template file name (excluding .st suffix) relative to + * {@code templateContextClass}. + * @param cacheTemplates Whether the template should be cached. + */ + public StringTemplateHelper( + Class<?> templateContextClass, + String templateName, + boolean cacheTemplates) { + + MorePreconditions.checkNotBlank(templateName); + String templatePath = + templateContextClass.getPackage().getName().replace('.', '/') + "/" + templateName; + StringTemplateGroup group = new StringTemplateGroup(templateName); + Preconditions.checkNotNull(group.getInstanceOf(templatePath), + "Failed to load template at: %s", templatePath); + + this.group = group; + if (!cacheTemplates) { + group.setRefreshInterval(0); + } + this.templatePath = templatePath; + } + + /** + * Thrown when an exception is encountered while populating a template. + */ + public static class TemplateException extends Exception { + public TemplateException(String msg, Throwable cause) { + super(msg, cause); + } + } + + /** + * Writes the populated template to an output writer by providing a closure with access to + * the unpopulated template object. + * + * @param out Template output writer. + * @param parameterSetter Closure to populate the template. + * @throws TemplateException If an exception was encountered while populating the template. + */ + public void writeTemplate( + Writer out, + Closure<StringTemplate> parameterSetter) throws TemplateException { + + Preconditions.checkNotNull(out); + Preconditions.checkNotNull(parameterSetter); + + StringTemplate stringTemplate = group.getInstanceOf(templatePath); + try { + parameterSetter.execute(stringTemplate); + stringTemplate.write(new AutoIndentWriter(out)); + } catch (IOException e) { + throw new TemplateException("Failed to write template: " + e, e); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java b/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java new file mode 100644 index 0000000..d2eb318 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java @@ -0,0 +1,81 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.util.testing; + +import com.google.common.base.Preconditions; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Clock; + +import java.util.concurrent.TimeUnit; + +/** + * A clock for use in testing with a configurable value for {@link #nowMillis()}. + * + * @author John Sirois + */ +public class FakeClock implements Clock { + // Tests may need to use the clock from multiple threads, ensure liveness. + private volatile long nowNanos; + + /** + * Sets what {@link #nowMillis()} will return until this method is called again with a new value + * for {@code now}. + * + * @param nowMillis the current time in milliseconds + */ + public void setNowMillis(long nowMillis) { + Preconditions.checkArgument(nowMillis >= 0); + this.nowNanos = TimeUnit.MILLISECONDS.toNanos(nowMillis); + } + + /** + * Advances the current time by {@code millis} milliseconds. Time can be retarded by passing a + * negative value. + * + * @param period the amount of time to advance the current time by + */ + public void advance(Amount<Long, Time> period) { + Preconditions.checkNotNull(period); + long newNanos = nowNanos + period.as(Time.NANOSECONDS); + Preconditions.checkArgument(newNanos >= 0, + "invalid period %s - would move current time to a negative value: %sns", period, newNanos); + nowNanos = newNanos; + } + + @Override + public long nowMillis() { + return TimeUnit.NANOSECONDS.toMillis(nowNanos); + } + + @Override + public long nowNanos() { + return nowNanos; + } + + /** + * Waits in fake time, immediately returning in real time; however a check of {@link #nowMillis} + * after this method completes will consistently reveal that {@code millis} did in fact pass while + * waiting. + * + * @param millis the amount of time to wait in milliseconds + */ + @Override + public void waitFor(long millis) { + advance(Amount.of(millis, Time.MILLISECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java b/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java new file mode 100644 index 0000000..fe25df8 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java @@ -0,0 +1,56 @@ +package com.twitter.common.util.testing; + + +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; + +import org.omg.CORBA.PUBLIC_MEMBER; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +/** + * A ticker for use in testing with a configurable value for {@link #Ticker#read()}. + */ +public class FakeTicker extends Ticker{ + private long nowNanos; + + /** + * Sets what {@link #read()} will return until this method is called again with a new value + * for {@code now}. + * + * @param nowNanos the current time in nanoseconds + */ + public void setNowNanos(long nowNanos) { + this.nowNanos = nowNanos; + } + + @Override + public long read(){ + return nowNanos; + } + + /** + * Advances the current time by the given {@code period}. Time can be retarded by passing a + * negative value. + * + * @param period the amount of time to advance the current time by + */ + public void advance(Amount<Long, Time> period) { + Preconditions.checkNotNull(period); + nowNanos = nowNanos + period.as(Time.NANOSECONDS); + } + + /** + * Waits in fake time, immediately returning in real time; however a check of {@link #Ticker#read()} + * after this method completes will consistently reveal that {@code nanos} did in fact pass while + * waiting. + * + * @param nanos the amount of time to wait in nanoseconds + */ + public void waitNanos(long nanos) { + advance(Amount.of(nanos, Time.NANOSECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java b/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java new file mode 100644 index 0000000..2b6eb21 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java @@ -0,0 +1,93 @@ +// ================================================================================================= +// Copyright 2012 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.webassets.bootstrap; + +import com.google.common.io.Resources; +import com.google.common.net.MediaType; +import com.google.inject.AbstractModule; + +import com.twitter.common.application.http.Registration; + +/** + * A binding module to register bootstrap HTTP assets. + */ +public final class BootstrapModule extends AbstractModule { + /** + * Enum for available Bootstrap versions to choose from. + */ + public enum BootstrapVersion { + VERSION_2_1_1 ("2.1.1"), + VERSION_2_3_2 ("2.3.2"); + + private final String version; + + BootstrapVersion(String s) { + version = s; + } + } + + private final String version; + + /** + * Default constructor. + */ + public BootstrapModule() { + this(BootstrapVersion.VERSION_2_1_1); + } + + /** + * BootstrapModule Constructor. + * + * @param version supplies the bootstrap version to select. + */ + public BootstrapModule(BootstrapVersion version) { + this.version = version.version; + } + + private void register(String mountPath, String resourcePath, String contentType) { + Registration.registerHttpAsset( + binder(), + "/" + mountPath, + Resources.getResource(BootstrapModule.class, resourcePath), + contentType, + true); + } + + @Override + protected void configure() { + register( + "css/bootstrap-responsive.min.css", + version + "/css/bootstrap-responsive.min.css", + MediaType.CSS_UTF_8.toString()); + register( + "css/bootstrap.min.css", + version + "/css/bootstrap.min.css", + MediaType.CSS_UTF_8.toString()); + register( + "img/glyphicons-halflings-white.png", + version + "/img/glyphicons-halflings-white.png", + MediaType.PNG.toString()); + register( + "img/glyphicons-halflings.png", + version + "/img/glyphicons-halflings.png", + MediaType.PNG.toString()); + register( + "js/bootstrap.min.js", + version + "/js/bootstrap.min.js", + MediaType.JAVASCRIPT_UTF_8.toString()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java b/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java new file mode 100644 index 0000000..316b328 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java @@ -0,0 +1,39 @@ +// ================================================================================================= +// Copyright 2012 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.webassets.jquery; + +import com.google.common.io.Resources; +import com.google.common.net.MediaType; +import com.google.inject.AbstractModule; + +import com.twitter.common.application.http.Registration; + +/** + * A binding module to register jQuery HTTP assets. + */ +public final class JQueryModule extends AbstractModule { + + @Override + protected void configure() { + Registration.registerHttpAsset( + binder(), + "/js/jquery.min.js", + Resources.getResource(JQueryModule.class, "js/jquery-1.8.2.min.js"), + MediaType.JAVASCRIPT_UTF_8.toString(), + true); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java b/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java new file mode 100644 index 0000000..bc9ec63 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java @@ -0,0 +1,82 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; + +import org.apache.zookeeper.KeeperException; + +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.Group.WatchException; +import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * Interface definition for becoming or querying for a ZooKeeper-based group leader. + */ +public interface Candidate { + + /** + * Returns the current group leader by querying ZooKeeper synchronously. + * + * @return the current group leader's identifying data or {@link Optional#absent()} if there is + * no leader + * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper + * @throws KeeperException if there was a problem reading the leader information + * @throws InterruptedException if this thread is interrupted getting the leader + */ + public Optional<byte[]> getLeaderData() + throws ZooKeeperConnectionException, KeeperException, InterruptedException; + + /** + * Encapsulates a leader that can be elected and subsequently defeated. + */ + interface Leader { + + /** + * Called when this leader has been elected. + * + * @param abdicate a command that can be used to abdicate leadership and force a new election + */ + void onElected(ExceptionalCommand<JoinException> abdicate); + + /** + * Called when the leader has been ousted. Can occur either if the leader abdicates or if an + * external event causes the leader to lose its leadership role (session expiration). + */ + void onDefeated(); + } + + /** + * Offers this candidate in leadership elections for as long as the current jvm process is alive. + * Upon election, the {@code onElected} callback will be executed and a command that can be used + * to abdicate leadership will be passed in. If the elected leader jvm process dies or the + * elected leader successfully abdicates then a new leader will be elected. Leaders that + * successfully abdicate are removed from the group and will not be eligible for leadership + * election unless {@link #offerLeadership(Leader)} is called again. + * + * @param leader the leader to notify of election and defeat events + * @throws JoinException if there was a problem joining the group + * @throws WatchException if there is a problem generating the 1st group membership list + * @throws InterruptedException if interrupted waiting to join the group and determine initial + * election results + * @return a supplier that can be queried to find out if this leader is currently elected + */ + public Supplier<Boolean> offerLeadership(Leader leader) + throws JoinException, WatchException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java new file mode 100644 index 0000000..3361a7f --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java @@ -0,0 +1,184 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; + +import org.apache.zookeeper.KeeperException; + +import com.twitter.common.base.Command; +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.zookeeper.Group.GroupChangeListener; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.Group.Membership; +import com.twitter.common.zookeeper.Group.WatchException; +import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * Implements leader election for small groups of candidates. This implementation is subject to the + * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection"> + * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools. + */ +public class CandidateImpl implements Candidate { + private static final Logger LOG = Logger.getLogger(CandidateImpl.class.getName()); + + private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8); + + private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = new Supplier<byte[]>() { + @Override public byte[] get() { + try { + return InetAddress.getLocalHost().getHostAddress().getBytes(); + } catch (UnknownHostException e) { + LOG.log(Level.WARNING, "Failed to determine local address!", e); + return UNKNOWN_CANDIDATE_DATA; + } + } + }; + + private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE = + new Function<Iterable<String>, String>() { + @Override public String apply(Iterable<String> candidates) { + return Ordering.natural().min(candidates); + } + }; + + private final Group group; + private final Function<Iterable<String>, String> judge; + private final Supplier<byte[]> dataSupplier; + + /** + * Equivalent to {@link #CandidateImpl(Group, com.google.common.base.Function, Supplier)} using a + * judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest or + * 1st candidate and a default supplier that provides the ip address of this host according to + * {@link java.net.InetAddress#getLocalHost()} as the leader identifying data. + */ + public CandidateImpl(Group group) { + this(group, MOST_RECENT_JUDGE, IP_ADDRESS_DATA_SUPPLIER); + } + + /** + * Creates a candidate that can be used to offer leadership for the given {@code group} using + * a judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest + * or 1st. The dataSupplier should produce bytes that identify this process as leader. These bytes + * will become available to all participants via the {@link Candidate#getLeaderData()} method. + */ + public CandidateImpl(Group group, Supplier<byte[]> dataSupplier) { + this(group, MOST_RECENT_JUDGE, dataSupplier); + } + + /** + * Creates a candidate that can be used to offer leadership for the given {@code group}. The + * {@code judge} is used to pick the current leader from all group members whenever the group + * membership changes. To form a well-behaved election group with one leader, all candidates + * should use the same judge. The dataSupplier should produce bytes that identify this process + * as leader. These bytes will become available to all participants via the + * {@link Candidate#getLeaderData()} method. + */ + public CandidateImpl( + Group group, + Function<Iterable<String>, String> judge, + Supplier<byte[]> dataSupplier) { + this.group = Preconditions.checkNotNull(group); + this.judge = Preconditions.checkNotNull(judge); + this.dataSupplier = Preconditions.checkNotNull(dataSupplier); + } + + @Override + public Optional<byte[]> getLeaderData() + throws ZooKeeperConnectionException, KeeperException, InterruptedException { + + String leaderId = getLeader(group.getMemberIds()); + return leaderId == null + ? Optional.<byte[]>absent() + : Optional.of(group.getMemberData(leaderId)); + } + + @Override + public Supplier<Boolean> offerLeadership(final Leader leader) + throws JoinException, WatchException, InterruptedException { + + final Membership membership = group.join(dataSupplier, new Command() { + @Override public void execute() { + leader.onDefeated(); + } + }); + + final AtomicBoolean elected = new AtomicBoolean(false); + final AtomicBoolean abdicated = new AtomicBoolean(false); + group.watch(new GroupChangeListener() { + @Override public void onGroupChange(Iterable<String> memberIds) { + boolean noCandidates = Iterables.isEmpty(memberIds); + String memberId = membership.getMemberId(); + + if (noCandidates) { + LOG.warning("All candidates have temporarily left the group: " + group); + } else if (!Iterables.contains(memberIds, memberId)) { + LOG.severe(String.format( + "Current member ID %s is not a candidate for leader, current voting: %s", + memberId, memberIds)); + } else { + boolean electedLeader = memberId.equals(getLeader(memberIds)); + boolean previouslyElected = elected.getAndSet(electedLeader); + + if (!previouslyElected && electedLeader) { + LOG.info(String.format("Candidate %s is now leader of group: %s", + membership.getMemberPath(), memberIds)); + + leader.onElected(new ExceptionalCommand<JoinException>() { + @Override public void execute() throws JoinException { + membership.cancel(); + abdicated.set(true); + } + }); + } else if (!electedLeader) { + if (previouslyElected) { + leader.onDefeated(); + } + LOG.info(String.format( + "Candidate %s waiting for the next leader election, current voting: %s", + membership.getMemberPath(), memberIds)); + } + } + } + }); + + return new Supplier<Boolean>() { + @Override public Boolean get() { + return !abdicated.get() && elected.get(); + } + }; + } + + @Nullable + private String getLeader(Iterable<String> memberIds) { + return Iterables.isEmpty(memberIds) ? null : judge.apply(memberIds); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java new file mode 100644 index 0000000..2e7260b --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java @@ -0,0 +1,211 @@ +package com.twitter.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import com.google.common.base.Joiner; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.twitter.common.base.Command; +import com.twitter.common.base.Commands; +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.thrift.ServiceInstance; +import com.twitter.thrift.Status; + +/** + * A ServerSet that delegates all calls to other ServerSets. + */ +public class CompoundServerSet implements ServerSet { + private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n'); + + private final List<ServerSet> serverSets; + private final Map<ServerSet, ImmutableSet<ServiceInstance>> instanceCache = Maps.newHashMap(); + private final List<HostChangeMonitor<ServiceInstance>> monitors = Lists.newArrayList(); + private Command stopWatching = null; + private ImmutableSet<ServiceInstance> allHosts = ImmutableSet.of(); + + /** + * Create new ServerSet from a list of serverSets. + * + * @param serverSets serverSets to which the calls will be delegated. + */ + public CompoundServerSet(Iterable<ServerSet> serverSets) { + MorePreconditions.checkNotBlank(serverSets); + this.serverSets = ImmutableList.copyOf(serverSets); + } + + private interface JoinOp { + EndpointStatus doJoin(ServerSet serverSet) throws JoinException, InterruptedException; + } + + private interface StatusOp { + void changeStatus(EndpointStatus status) throws UpdateException; + } + + private void changeStatus( + ImmutableList<EndpointStatus> statuses, + StatusOp statusOp) throws UpdateException { + + ImmutableList.Builder<String> builder = ImmutableList.builder(); + int errorIdx = 1; + for (EndpointStatus endpointStatus : statuses) { + try { + statusOp.changeStatus(endpointStatus); + } catch (UpdateException exception) { + builder.add(String.format("[%d] %s", errorIdx++, + Throwables.getStackTraceAsString(exception))); + } + } + if (errorIdx > 1) { + throw new UpdateException( + "One or more ServerSet update failed: " + STACK_TRACE_JOINER.join(builder.build())); + } + } + + private EndpointStatus doJoin(JoinOp joiner) throws JoinException, InterruptedException { + // Get the list of endpoint status from the serverSets. + ImmutableList.Builder<EndpointStatus> builder = ImmutableList.builder(); + for (ServerSet serverSet : serverSets) { + builder.add(joiner.doJoin(serverSet)); + } + + final ImmutableList<EndpointStatus> statuses = builder.build(); + + return new EndpointStatus() { + @Override public void leave() throws UpdateException { + changeStatus(statuses, new StatusOp() { + @Override public void changeStatus(EndpointStatus status) throws UpdateException { + status.leave(); + } + }); + } + + @Override public void update(final Status newStatus) throws UpdateException { + changeStatus(statuses, new StatusOp() { + @Override public void changeStatus(EndpointStatus status) throws UpdateException { + status.update(newStatus); + } + }); + } + }; + } + + @Override + public EndpointStatus join( + final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints) + throws Group.JoinException, InterruptedException { + + return doJoin(new JoinOp() { + @Override public EndpointStatus doJoin(ServerSet serverSet) + throws JoinException, InterruptedException { + return serverSet.join(endpoint, additionalEndpoints); + } + }); + } + + /* + * If any one of the serverSet throws an exception during respective join, the exception is + * propagated. Join is successful only if all the joins are successful. + * + * NOTE: If an exception occurs during the join, the serverSets in the composite can be in a + * partially joined state. + * + * @see ServerSet#join(InetSocketAddress, Map, Status) + */ + @Override + public EndpointStatus join( + final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints, + final Status status) throws Group.JoinException, InterruptedException { + + return doJoin(new JoinOp() { + @Override public EndpointStatus doJoin(ServerSet serverSet) + throws JoinException, InterruptedException { + + return serverSet.join(endpoint, additionalEndpoints, status); + } + }); + } + + @Override + public EndpointStatus join( + final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints, + final int shardId) throws JoinException, InterruptedException { + + return doJoin(new JoinOp() { + @Override public EndpointStatus doJoin(ServerSet serverSet) + throws JoinException, InterruptedException { + + return serverSet.join(endpoint, additionalEndpoints, shardId); + } + }); + } + + // Handles changes to the union of hosts. + private synchronized void handleChange(ServerSet serverSet, ImmutableSet<ServiceInstance> hosts) { + instanceCache.put(serverSet, hosts); + + // Get the union of hosts. + ImmutableSet<ServiceInstance> currentHosts = + ImmutableSet.copyOf(Iterables.concat(instanceCache.values())); + + // Check if the hosts have changed. + if (!currentHosts.equals(allHosts)) { + allHosts = currentHosts; + + // Notify the monitors. + for (HostChangeMonitor<ServiceInstance> monitor : monitors) { + monitor.onChange(allHosts); + } + } + } + + /** + * Monitor the CompoundServerSet. + * + * If any one of the monitor calls to the underlying serverSet raises a MonitorException, the + * exception is propagated. The call is successful only if all the monitor calls to the + * underlying serverSets are successful. + * + * NOTE: If an exception occurs during the monitor call, the serverSets in the composite will not + * be monitored. + * + * @param monitor HostChangeMonitor instance used to monitor host changes. + * @return A command that, when executed, will stop monitoring all underlying server sets. + * @throws MonitorException If there was a problem monitoring any of the underlying server sets. + */ + @Override + public synchronized Command watch(HostChangeMonitor<ServiceInstance> monitor) + throws MonitorException { + if (stopWatching == null) { + monitors.add(monitor); + ImmutableList.Builder<Command> commandsBuilder = ImmutableList.builder(); + + for (final ServerSet serverSet : serverSets) { + commandsBuilder.add(serverSet.watch(new HostChangeMonitor<ServiceInstance>() { + @Override public void onChange(ImmutableSet<ServiceInstance> hostSet) { + handleChange(serverSet, hostSet); + } + })); + } + + stopWatching = Commands.compound(commandsBuilder.build()); + } + + return stopWatching; + } + + @Override + public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + watch(monitor); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java new file mode 100644 index 0000000..fdcd8d9 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java @@ -0,0 +1,42 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.util.concurrent.TimeUnit; + +/** + * DistributedLock + * + * @author Florian Leibert + */ +public interface DistributedLock { + void lock() throws LockingException; + + boolean tryLock(long timeout, TimeUnit unit); + + void unlock() throws LockingException; + + public static class LockingException extends RuntimeException { + public LockingException(String msg, Exception e) { + super(msg, e); + } + + public LockingException(String msg) { + super(msg); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java new file mode 100644 index 0000000..7669f92 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java @@ -0,0 +1,289 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.concurrent.ThreadSafe; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import com.twitter.common.base.MorePreconditions; + +/** + * Distributed locking via ZooKeeper. Assuming there are N clients that all try to acquire a lock, + * the algorithm works as follows. Each host creates an ephemeral|sequential node, and requests a + * list of children for the lock node. Due to the nature of sequential, all the ids are increasing + * in order, therefore the client with the least ID according to natural ordering will hold the + * lock. Every other client watches the id immediately preceding its own id and checks for the lock + * in case of notification. The client holding the lock does the work and finally deletes the node, + * thereby triggering the next client in line to acquire the lock. Deadlocks are possible but + * avoided in most cases because if a client drops dead while holding the lock, the ZK session + * should timeout and since the node is ephemeral, it will be removed in such a case. Deadlocks + * could occur if the the worker thread on a client hangs but the zk-client thread is still alive. + * There could be an external monitor client that ensures that alerts are triggered if the least-id + * ephemeral node is present past a time-out. + * <p/> + * Note: Locking attempts will fail in case session expires! + * + * @author Florian Leibert + */ +@ThreadSafe +public class DistributedLockImpl implements DistributedLock { + + private static final Logger LOG = Logger.getLogger(DistributedLockImpl.class.getName()); + + private final ZooKeeperClient zkClient; + private final String lockPath; + private final ImmutableList<ACL> acl; + + private final AtomicBoolean aborted = new AtomicBoolean(false); + private CountDownLatch syncPoint; + private boolean holdsLock = false; + private String currentId; + private String currentNode; + private String watchedNode; + private LockWatcher watcher; + + /** + * Equivalent to {@link #DistributedLockImpl(ZooKeeperClient, String, Iterable)} with a default + * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}). + */ + public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath) { + this(zkClient, lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + /** + * Creates a distributed lock using the given {@code zkClient} to coordinate locking. + * + * @param zkClient The ZooKeeper client to use. + * @param lockPath The path used to manage the lock under. + * @param acl The acl to apply to newly created lock nodes. + */ + public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath, Iterable<ACL> acl) { + this.zkClient = Preconditions.checkNotNull(zkClient); + this.lockPath = MorePreconditions.checkNotBlank(lockPath); + this.acl = ImmutableList.copyOf(acl); + this.syncPoint = new CountDownLatch(1); + } + + private synchronized void prepare() + throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException, KeeperException { + + ZooKeeperUtils.ensurePath(zkClient, acl, lockPath); + LOG.log(Level.FINE, "Working with locking path:" + lockPath); + + // Create an EPHEMERAL_SEQUENTIAL node. + currentNode = + zkClient.get().create(lockPath + "/member_", null, acl, CreateMode.EPHEMERAL_SEQUENTIAL); + + // We only care about our actual id since we want to compare ourselves to siblings. + if (currentNode.contains("/")) { + currentId = currentNode.substring(currentNode.lastIndexOf("/") + 1); + } + LOG.log(Level.FINE, "Received ID from zk:" + currentId); + this.watcher = new LockWatcher(); + } + + @Override + public synchronized void lock() throws LockingException { + if (holdsLock) { + throw new LockingException("Error, already holding a lock. Call unlock first!"); + } + try { + prepare(); + watcher.checkForLock(); + syncPoint.await(); + if (!holdsLock) { + throw new LockingException("Error, couldn't acquire the lock!"); + } + } catch (InterruptedException e) { + cancelAttempt(); + throw new LockingException("InterruptedException while trying to acquire lock!", e); + } catch (KeeperException e) { + // No need to clean up since the node wasn't created yet. + throw new LockingException("KeeperException while trying to acquire lock!", e); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + // No need to clean up since the node wasn't created yet. + throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e); + } + } + + @Override + public synchronized boolean tryLock(long timeout, TimeUnit unit) { + if (holdsLock) { + throw new LockingException("Error, already holding a lock. Call unlock first!"); + } + try { + prepare(); + watcher.checkForLock(); + boolean success = syncPoint.await(timeout, unit); + if (!success) { + return false; + } + if (!holdsLock) { + throw new LockingException("Error, couldn't acquire the lock!"); + } + } catch (InterruptedException e) { + cancelAttempt(); + return false; + } catch (KeeperException e) { + // No need to clean up since the node wasn't created yet. + throw new LockingException("KeeperException while trying to acquire lock!", e); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + // No need to clean up since the node wasn't created yet. + throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e); + } + return true; + } + + @Override + public synchronized void unlock() throws LockingException { + if (currentId == null) { + throw new LockingException("Error, neither attempting to lock nor holding a lock!"); + } + Preconditions.checkNotNull(currentId); + // Try aborting! + if (!holdsLock) { + aborted.set(true); + LOG.log(Level.INFO, "Not holding lock, aborting acquisition attempt!"); + } else { + LOG.log(Level.INFO, "Cleaning up this locks ephemeral node."); + cleanup(); + } + } + + //TODO(Florian Leibert): Make sure this isn't a runtime exception. Put exceptions into the token? + + private synchronized void cancelAttempt() { + LOG.log(Level.INFO, "Cancelling lock attempt!"); + cleanup(); + // Bubble up failure... + holdsLock = false; + syncPoint.countDown(); + } + + private void cleanup() { + LOG.info("Cleaning up!"); + Preconditions.checkNotNull(currentId); + try { + Stat stat = zkClient.get().exists(currentNode, false); + if (stat != null) { + zkClient.get().delete(currentNode, ZooKeeperUtils.ANY_VERSION); + } else { + LOG.log(Level.WARNING, "Called cleanup but nothing to cleanup!"); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + holdsLock = false; + aborted.set(false); + currentId = null; + currentNode = null; + watcher = null; + syncPoint = new CountDownLatch(1); + } + + class LockWatcher implements Watcher { + + public synchronized void checkForLock() { + MorePreconditions.checkNotBlank(currentId); + + try { + List<String> candidates = zkClient.get().getChildren(lockPath, null); + ImmutableList<String> sortedMembers = Ordering.natural().immutableSortedCopy(candidates); + + // Unexpected behavior if there are no children! + if (sortedMembers.isEmpty()) { + throw new LockingException("Error, member list is empty!"); + } + + int memberIndex = sortedMembers.indexOf(currentId); + + // If we hold the lock + if (memberIndex == 0) { + holdsLock = true; + syncPoint.countDown(); + } else { + final String nextLowestNode = sortedMembers.get(memberIndex - 1); + LOG.log(Level.INFO, String.format("Current LockWatcher with ephemeral node [%s], is " + + "waiting for [%s] to release lock.", currentId, nextLowestNode)); + + watchedNode = String.format("%s/%s", lockPath, nextLowestNode); + Stat stat = zkClient.get().exists(watchedNode, this); + if (stat == null) { + checkForLock(); + } + } + } catch (InterruptedException e) { + LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + + "got interrupted. Trying to cancel lock acquisition.", currentId), e); + cancelAttempt(); + } catch (KeeperException e) { + LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + + "got a KeeperException. Trying to cancel lock acquisition.", currentId), e); + cancelAttempt(); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + + "got a ConnectionException. Trying to cancel lock acquisition.", currentId), e); + cancelAttempt(); + } + } + + @Override + public synchronized void process(WatchedEvent event) { + // this handles the case where we have aborted a lock and deleted ourselves but still have a + // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub. + if (!event.getPath().equals(watchedNode)) { + LOG.log(Level.INFO, "Ignoring call for node:" + watchedNode); + return; + } + //TODO(Florian Leibert): Pull this into the outer class. + if (event.getType() == Watcher.Event.EventType.None) { + switch (event.getState()) { + case SyncConnected: + // TODO(Florian Leibert): maybe we should just try to "fail-fast" in this case and abort. + LOG.info("Reconnected..."); + break; + case Expired: + LOG.log(Level.WARNING, String.format("Current ZK session expired![%s]", currentId)); + cancelAttempt(); + break; + } + } else if (event.getType() == Event.EventType.NodeDeleted) { + checkForLock(); + } else { + LOG.log(Level.WARNING, String.format("Unexpected ZK event: %s", event.getType().name())); + } + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/Group.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Group.java b/commons/src/main/java/com/twitter/common/zookeeper/Group.java new file mode 100644 index 0000000..81c451c --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/Group.java @@ -0,0 +1,711 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.util.List; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.data.ACL; + +import com.twitter.common.base.Command; +import com.twitter.common.base.Commands; +import com.twitter.common.base.ExceptionalSupplier; +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.util.BackoffHelper; +import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * This class exposes methods for joining and monitoring distributed groups. The groups this class + * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for + * each member of a group. + */ +public class Group { + private static final Logger LOG = Logger.getLogger(Group.class.getName()); + + private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null); + private static final String DEFAULT_NODE_NAME_PREFIX = "member_"; + + private final ZooKeeperClient zkClient; + private final ImmutableList<ACL> acl; + private final String path; + + private final NodeScheme nodeScheme; + private final Predicate<String> nodeNameFilter; + + private final BackoffHelper backoffHelper; + + /** + * Creates a group rooted at the given {@code path}. Paths must be absolute and trailing or + * duplicate slashes will be normalized. For example, all the following paths would create a + * group at the normalized path /my/distributed/group: + * <ul> + * <li>/my/distributed/group + * <li>/my/distributed/group/ + * <li>/my/distributed//group + * </ul> + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param acl the ACL to use for creating the persistent group path if it does not already exist + * @param path the absolute persistent path that represents this group + * @param nodeScheme the scheme that defines how nodes are created + */ + public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) { + this.zkClient = Preconditions.checkNotNull(zkClient); + this.acl = ImmutableList.copyOf(acl); + this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path)); + + this.nodeScheme = Preconditions.checkNotNull(nodeScheme); + nodeNameFilter = new Predicate<String>() { + @Override public boolean apply(String nodeName) { + return Group.this.nodeScheme.isMember(nodeName); + } + }; + + backoffHelper = new BackoffHelper(); + } + + /** + * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a + * {@code namePrefix} of 'member_'. + */ + public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { + this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX); + } + + /** + * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a + * {@link DefaultScheme} using {@code namePrefix}. + */ + public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) { + this(zkClient, acl, path, new DefaultScheme(namePrefix)); + } + + public String getMemberPath(String memberId) { + return path + "/" + MorePreconditions.checkNotBlank(memberId); + } + + public String getPath() { + return path; + } + + public String getMemberId(String nodePath) { + MorePreconditions.checkNotBlank(nodePath); + Preconditions.checkArgument(nodePath.startsWith(path + "/"), + "Not a member of this group[%s]: %s", path, nodePath); + + String memberId = StringUtils.substringAfterLast(nodePath, "/"); + Preconditions.checkArgument(nodeScheme.isMember(memberId), + "Not a group member: %s", memberId); + return memberId; + } + + /** + * Returns the current list of group member ids by querying ZooKeeper synchronously. + * + * @return the ids of all the present members of this group + * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper + * @throws KeeperException if there was a problem reading this group's member ids + * @throws InterruptedException if this thread is interrupted listing the group members + */ + public Iterable<String> getMemberIds() + throws ZooKeeperConnectionException, KeeperException, InterruptedException { + return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter); + } + + /** + * Gets the data for one of this groups members by querying ZooKeeper synchronously. + * + * @param memberId the id of the member whose data to retrieve + * @return the data associated with the {@code memberId} + * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper + * @throws KeeperException if there was a problem reading this member's data + * @throws InterruptedException if this thread is interrupted retrieving the member data + */ + public byte[] getMemberData(String memberId) + throws ZooKeeperConnectionException, KeeperException, InterruptedException { + return zkClient.get().getData(getMemberPath(memberId), false, null); + } + + /** + * Represents membership in a distributed group. + */ + public interface Membership { + + /** + * Returns the persistent ZooKeeper path that represents this group. + */ + String getGroupPath(); + + /** + * Returns the id (ZooKeeper node name) of this group member. May change over time if the + * ZooKeeper session expires. + */ + String getMemberId(); + + /** + * Returns the full ZooKeeper path to this group member. May change over time if the + * ZooKeeper session expires. + */ + String getMemberPath(); + + /** + * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to + * {@link Group#join()}. + * + * @return the new membership data + * @throws UpdateException if there was a problem updating the membership data + */ + byte[] updateMemberData() throws UpdateException; + + /** + * Cancels group membership by deleting the associated ZooKeeper member node. + * + * @throws JoinException if there is a problem deleting the node + */ + void cancel() throws JoinException; + } + + /** + * Indicates an error joining a group. + */ + public static class JoinException extends Exception { + public JoinException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Indicates an error updating a group member's data. + */ + public static class UpdateException extends Exception { + public UpdateException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Equivalent to calling {@code join(null, null)}. + */ + public final Membership join() throws JoinException, InterruptedException { + return join(NO_MEMBER_DATA, null); + } + + /** + * Equivalent to calling {@code join(memberData, null)}. + */ + public final Membership join(Supplier<byte[]> memberData) + throws JoinException, InterruptedException { + + return join(memberData, null); + } + + /** + * Equivalent to calling {@code join(null, onLoseMembership)}. + */ + public final Membership join(@Nullable final Command onLoseMembership) + throws JoinException, InterruptedException { + + return join(NO_MEMBER_DATA, onLoseMembership); + } + + /** + * Joins this group and returns the resulting Membership when successful. Membership will be + * automatically cancelled when the current jvm process dies; however the returned Membership + * object can be used to cancel membership earlier. Unless + * {@link com.twitter.common.zookeeper.Group.Membership#cancel()} is called the membership will + * be maintained by re-establishing it silently in the background. + * + * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper. If an + * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses + * membership in the group. + * + * @param memberData a supplier of the data to store in the member node + * @param onLoseMembership a callback to notify when membership is lost + * @return a Membership object with the member details + * @throws JoinException if there was a problem joining the group + * @throws InterruptedException if this thread is interrupted awaiting completion of the join + */ + public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) + throws JoinException, InterruptedException { + + Preconditions.checkNotNull(memberData); + ensurePersistentGroupPath(); + + final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership); + return backoffHelper.doUntilResult(new ExceptionalSupplier<Membership, JoinException>() { + @Override public Membership get() throws JoinException { + try { + return groupJoiner.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Interrupted trying to join group at path: " + path, e); + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e); + return null; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e); + return null; + } else { + throw new JoinException("Problem joining partition group at path: " + path, e); + } + } + } + }); + } + + private void ensurePersistentGroupPath() throws JoinException, InterruptedException { + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() { + @Override public Boolean get() throws JoinException { + try { + ZooKeeperUtils.ensurePath(zkClient, acl, path); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Interrupted trying to ensure group at path: " + path, e); + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error ensuring path: " + path, e); + return false; + } else { + throw new JoinException("Problem ensuring group at path: " + path, e); + } + } + } + }); + } + + private class ActiveMembership implements Membership { + private final Supplier<byte[]> memberData; + private final Command onLoseMembership; + private String nodePath; + private String memberId; + private volatile boolean cancelled; + private byte[] membershipData; + + public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) { + this.memberData = memberData; + this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership; + } + + @Override + public String getGroupPath() { + return path; + } + + @Override + public synchronized String getMemberId() { + return memberId; + } + + @Override + public synchronized String getMemberPath() { + return nodePath; + } + + @Override + public synchronized byte[] updateMemberData() throws UpdateException { + byte[] membershipData = memberData.get(); + if (!ArrayUtils.isEquals(this.membershipData, membershipData)) { + try { + zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION); + this.membershipData = membershipData; + } catch (KeeperException e) { + throw new UpdateException("Problem updating membership data.", e); + } catch (InterruptedException e) { + throw new UpdateException("Interrupted attempting to update membership data.", e); + } catch (ZooKeeperConnectionException e) { + throw new UpdateException( + "Could not connect to the ZooKeeper cluster to update membership data.", e); + } + } + return membershipData; + } + + @Override + public synchronized void cancel() throws JoinException { + if (!cancelled) { + try { + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() { + @Override public Boolean get() throws JoinException { + try { + zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e); + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (NoNodeException e) { + LOG.info("Membership already cancelled, node at path: " + nodePath + + " has been deleted"); + return true; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error cancelling membership: " + nodePath, e); + return false; + } else { + throw new JoinException("Problem cancelling membership: " + nodePath, e); + } + } + } + }); + cancelled = true; // Prevent auto-re-join logic from undoing this cancel. + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Problem cancelling membership: " + nodePath, e); + } + } + } + + private class CancelledException extends IllegalStateException { /* marker */ } + + synchronized Membership join() + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + + if (cancelled) { + throw new CancelledException(); + } + + if (nodePath == null) { + // Re-join if our ephemeral node goes away due to session expiry - only needs to be + // registered once. + zkClient.registerExpirationHandler(new Command() { + @Override public void execute() { + tryJoin(); + } + }); + } + + byte[] membershipData = memberData.get(); + String nodeName = nodeScheme.createName(membershipData); + CreateMode createMode = nodeScheme.isSequential() + ? CreateMode.EPHEMERAL_SEQUENTIAL + : CreateMode.EPHEMERAL; + nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode); + memberId = Group.this.getMemberId(nodePath); + LOG.info("Set group member ID to " + memberId); + this.membershipData = membershipData; + + // Re-join if our ephemeral node goes away due to maliciousness. + zkClient.get().exists(nodePath, new Watcher() { + @Override public void process(WatchedEvent event) { + if (event.getType() == EventType.NodeDeleted) { + tryJoin(); + } + } + }); + + return this; + } + + private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin = + new ExceptionalSupplier<Boolean, InterruptedException>() { + @Override public Boolean get() throws InterruptedException { + try { + join(); + return true; + } catch (CancelledException e) { + // Lost a cancel race - that's ok. + return true; + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error re-joining group: " + path, e); + return false; + } else { + throw new IllegalStateException("Permanent problem re-joining group: " + path, e); + } + } + } + }; + + private synchronized void tryJoin() { + onLoseMembership.execute(); + try { + backoffHelper.doUntilSuccess(tryJoin); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + String.format("Interrupted while trying to re-join group: %s, giving up", path), e); + } + } + } + + /** + * An interface to an object that listens for changes to a group's membership. + */ + public interface GroupChangeListener { + + /** + * Called whenever group membership changes with the new list of member ids. + * + * @param memberIds the current member ids + */ + void onGroupChange(Iterable<String> memberIds); + } + + /** + * An interface that dictates the scheme to use for storing and filtering nodes that represent + * members of a distributed group. + */ + public interface NodeScheme { + /** + * Determines if a child node is a member of a group by examining the node's name. + * + * @param nodeName the name of a child node found in a group + * @return {@code true} if {@code nodeName} identifies a group member in this scheme + */ + boolean isMember(String nodeName); + + /** + * Generates a node name for the node representing this process in the distributed group. + * + * @param membershipData the data that will be stored in this node + * @return the name for the node that will represent this process in the group + */ + String createName(byte[] membershipData); + + /** + * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes. + * + * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise + */ + boolean isSequential(); + } + + /** + * Indicates an error watching a group. + */ + public static class WatchException extends Exception { + public WatchException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Watches this group for the lifetime of this jvm process. This method will block until the + * current group members are available, notify the {@code groupChangeListener} and then return. + * All further changes to the group membership will cause notifications on a background thread. + * + * @param groupChangeListener the listener to notify of group membership change events + * @return A command which, when executed, will stop watching the group. + * @throws WatchException if there is a problem generating the 1st group membership list + * @throws InterruptedException if interrupted waiting to gather the 1st group membership list + */ + public final Command watch(final GroupChangeListener groupChangeListener) + throws WatchException, InterruptedException { + Preconditions.checkNotNull(groupChangeListener); + + try { + ensurePersistentGroupPath(); + } catch (JoinException e) { + throw new WatchException("Failed to create group path: " + path, e); + } + + final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener); + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, WatchException>() { + @Override public Boolean get() throws WatchException { + try { + groupMonitor.watchGroup(); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new WatchException("Interrupted trying to watch group at path: " + path, e); + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e); + return null; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e); + return null; + } else { + throw new WatchException("Problem trying to watch group at path: " + path, e); + } + } + } + }); + return new Command() { + @Override public void execute() { + groupMonitor.stopWatching(); + } + }; + } + + /** + * Helps continuously monitor a group for membership changes. + */ + private class GroupMonitor { + private final GroupChangeListener groupChangeListener; + private volatile boolean stopped = false; + private Set<String> members; + + GroupMonitor(GroupChangeListener groupChangeListener) { + this.groupChangeListener = groupChangeListener; + } + + private final Watcher groupWatcher = new Watcher() { + @Override public final void process(WatchedEvent event) { + if (event.getType() == EventType.NodeChildrenChanged) { + tryWatchGroup(); + } + } + }; + + private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup = + new ExceptionalSupplier<Boolean, InterruptedException>() { + @Override public Boolean get() throws InterruptedException { + try { + watchGroup(); + return true; + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error re-watching group: " + path, e); + return false; + } else { + throw new IllegalStateException("Permanent problem re-watching group: " + path, e); + } + } + } + }; + + private void tryWatchGroup() { + if (stopped) { + return; + } + + try { + backoffHelper.doUntilSuccess(tryWatchGroup); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + String.format("Interrupted while trying to re-watch group: %s, giving up", path), e); + } + } + + private void watchGroup() + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + + if (stopped) { + return; + } + + List<String> children = zkClient.get().getChildren(path, groupWatcher); + setMembers(Iterables.filter(children, nodeNameFilter)); + } + + private void stopWatching() { + // TODO(William Farner): Cancel the watch when + // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved. + LOG.info("Stopping watch on " + this); + stopped = true; + } + + synchronized void setMembers(Iterable<String> members) { + if (stopped) { + LOG.info("Suppressing membership update, no longer watching " + this); + return; + } + + if (this.members == null) { + // Reset our watch on the group if session expires - only needs to be registered once. + zkClient.registerExpirationHandler(new Command() { + @Override public void execute() { + tryWatchGroup(); + } + }); + } + + Set<String> membership = ImmutableSet.copyOf(members); + if (!membership.equals(this.members)) { + groupChangeListener.onGroupChange(members); + this.members = membership; + } + } + } + + /** + * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] + + * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the + * prefix is "member_", the node's full path will look something like + * {@code /discovery/servicename/member_0000000007}. + */ + public static class DefaultScheme implements NodeScheme { + private final String namePrefix; + private final Pattern namePattern; + + /** + * Creates a sequential node scheme based on the given node name prefix. + * + * @param namePrefix the prefix for the names of the member nodes + */ + public DefaultScheme(String namePrefix) { + this.namePrefix = MorePreconditions.checkNotBlank(namePrefix); + namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$"); + } + + @Override + public boolean isMember(String nodeName) { + return namePattern.matcher(nodeName).matches(); + } + + @Override + public String createName(byte[] membershipData) { + return namePrefix; + } + + @Override + public boolean isSequential() { + return true; + } + } + + @Override + public String toString() { + return "Group " + path; + } +}
