http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java new file mode 100644 index 0000000..ce243fb --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java @@ -0,0 +1,372 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.net.InetSocketAddressHelper; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.PathUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages a connection to a ZooKeeper cluster. + */ +public class ZooKeeperClient { + + /** + * Indicates an error connecting to a zookeeper cluster. + */ + public class ZooKeeperConnectionException extends Exception { + ZooKeeperConnectionException(String message, Throwable cause) { + super(message, cause); + } + } + + private final class SessionState { + private final long sessionId; + private final byte[] sessionPasswd; + + private SessionState(long sessionId, byte[] sessionPasswd) { + this.sessionId = sessionId; + this.sessionPasswd = sessionPasswd; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class); + + private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS); + + private final int sessionTimeoutMs; + private final Optional<Credentials> credentials; + private final String zooKeeperServers; + // GuardedBy "this", but still volatile for tests, where we want to be able to see writes + // made from within long synchronized blocks. + private volatile ZooKeeper zooKeeper; + private SessionState sessionState; + + private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>(); + private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>(); + + private static Iterable<InetSocketAddress> combine(InetSocketAddress address, + InetSocketAddress... addresses) { + return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build(); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get()}. + * + * @param sessionTimeout the ZK session timeout + * @param zooKeeperServer the first, required ZK server + * @param zooKeeperServers any additional servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer, + InetSocketAddress... zooKeeperServers) { + this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers)); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get}. + * + * @param sessionTimeout the ZK session timeout + * @param zooKeeperServers the set of servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, + Iterable<InetSocketAddress> zooKeeperServers) { + this(sessionTimeout, Optional.absent(), Optional.absent(), zooKeeperServers); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get()}. All successful connections will be authenticated with the given + * {@code credentials}. + * + * @param sessionTimeout the ZK session timeout + * @param credentials the credentials to authenticate with + * @param zooKeeperServer the first, required ZK server + * @param zooKeeperServers any additional servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, + InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) { + this(sessionTimeout, + Optional.of(credentials), + Optional.absent(), + combine(zooKeeperServer, zooKeeperServers)); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get}. All successful connections will be authenticated with the given + * {@code credentials}. + * + * @param sessionTimeout the ZK session timeout + * @param credentials the credentials to authenticate with + * @param zooKeeperServers the set of servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, + Iterable<InetSocketAddress> zooKeeperServers) { + this(sessionTimeout, + Optional.of(credentials), + Optional.absent(), + zooKeeperServers); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get}. All successful connections will be authenticated with the given + * {@code credentials}. + * + * @param sessionTimeout the ZK session timeout + * @param credentials the credentials to authenticate with + * @param chrootPath an optional chroot path + * @param zooKeeperServers the set of servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials, + Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) { + this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS); + this.credentials = Preconditions.checkNotNull(credentials); + + if (chrootPath.isPresent()) { + PathUtils.validatePath(chrootPath.get()); + } + + Preconditions.checkNotNull(zooKeeperServers); + Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers), + "Must present at least 1 ZK server"); + + Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") { + @Override + public void run() { + while (true) { + try { + WatchedEvent event = eventQueue.take(); + for (Watcher watcher : watchers) { + watcher.process(event); + } + } catch (InterruptedException e) { /* ignore */ } + } + } + }; + watcherProcessor.setDaemon(true); + watcherProcessor.start(); + + Iterable<String> servers = + Iterables.transform(ImmutableSet.copyOf(zooKeeperServers), + InetSocketAddressHelper::toString); + this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or("")); + } + + /** + * Returns the current active ZK connection or establishes a new one if none has yet been + * established or a previous connection was disconnected or had its session time out. This method + * will attempt to re-use sessions when possible. Equivalent to: + * <pre>get(Amount.of(0L, ...)</pre>. + * + * @return a connected ZooKeeper client + * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster + * @throws InterruptedException if interrupted while waiting for a connection to be established + */ + public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException { + try { + return get(WAIT_FOREVER); + } catch (TimeoutException e) { + InterruptedException interruptedException = + new InterruptedException("Got an unexpected TimeoutException for 0 wait"); + interruptedException.initCause(e); + throw interruptedException; + } + } + + /** + * Returns the current active ZK connection or establishes a new one if none has yet been + * established or a previous connection was disconnected or had its session time out. This + * method will attempt to re-use sessions when possible. + * + * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK + * cluster to be established; 0 to wait forever + * @return a connected ZooKeeper client + * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster + * @throws InterruptedException if interrupted while waiting for a connection to be established + * @throws TimeoutException if a connection could not be established within the configured + * session timeout + */ + public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout) + throws ZooKeeperConnectionException, InterruptedException, TimeoutException { + + if (zooKeeper == null) { + final CountDownLatch connected = new CountDownLatch(1); + Watcher watcher = event -> { + switch (event.getType()) { + // Guard the None type since this watch may be used as the default watch on calls by + // the client outside our control. + case None: + switch (event.getState()) { + case Expired: + LOG.info("Zookeeper session expired. Event: " + event); + close(); + break; + case SyncConnected: + connected.countDown(); + break; + } + } + + eventQueue.offer(event); + }; + + try { + zooKeeper = (sessionState != null) + ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId, + sessionState.sessionPasswd) + : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher); + } catch (IOException e) { + throw new ZooKeeperConnectionException( + "Problem connecting to servers: " + zooKeeperServers, e); + } + + if (connectionTimeout.getValue() > 0) { + if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { + close(); + throw new TimeoutException("Timed out waiting for a ZK connection after " + + connectionTimeout); + } + } else { + try { + connected.await(); + } catch (InterruptedException ex) { + LOG.info("Interrupted while waiting to connect to zooKeeper"); + close(); + throw ex; + } + } + if (credentials.isPresent()) { + Credentials zkCredentials = credentials.get(); + zooKeeper.addAuthInfo(zkCredentials.scheme(), zkCredentials.authToken()); + } + + sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd()); + } + return zooKeeper; + } + + /** + * Clients that need to re-establish state after session expiration can register an + * {@code onExpired} command to execute. + * + * @param onExpired the {@code Command} to register + * @return the new {@link Watcher} which can later be passed to {@link #unregister} for + * removal. + */ + public Watcher registerExpirationHandler(final Command onExpired) { + Watcher watcher = event -> { + if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) { + onExpired.execute(); + } + }; + register(watcher); + return watcher; + } + + /** + * Clients that need to register a top-level {@code Watcher} should do so using this method. The + * registered {@code watcher} will remain registered across re-connects and session expiration + * events. + * + * @param watcher the {@code Watcher to register} + */ + public void register(Watcher watcher) { + watchers.add(watcher); + } + + /** + * Clients can attempt to unregister a top-level {@code Watcher} that has previously been + * registered. + * + * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch + * @return whether the given {@code Watcher} was found and removed from the active set + */ + public boolean unregister(Watcher watcher) { + return watchers.remove(watcher); + } + + /** + * Checks to see if the client might reasonably re-try an operation given the exception thrown + * while attempting it. If the ZooKeeper session should be expired to enable the re-try to + * succeed this method will expire it as a side-effect. + * + * @param e the exception to test + * @return true if a retry can be attempted + */ + public boolean shouldRetry(KeeperException e) { + if (e instanceof SessionExpiredException) { + close(); + } + return ZooKeeperUtils.isRetryable(e); + } + + /** + * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent + * calls to this method will no-op until the next successful {@link #get}. + */ + public synchronized void close() { + if (zooKeeper != null) { + try { + zooKeeper.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted trying to close zooKeeper"); + } finally { + zooKeeper = null; + sessionState = null; + } + } + } + + @VisibleForTesting + synchronized boolean isClosed() { + return zooKeeper == null; + } + + @VisibleForTesting + ZooKeeper getZooKeeperClientForTests() { + return zooKeeper; + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java new file mode 100644 index 0000000..2ada264 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java @@ -0,0 +1,167 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.common.PathUtils; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utilities for dealing with zoo keeper. + */ +public final class ZooKeeperUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class); + + /** + * An appropriate default session timeout for Twitter ZooKeeper clusters. + */ + public static final Amount<Integer,Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS); + + /** + * The magic version number that allows any mutation to always succeed regardless of actual + * version number. + */ + public static final int ANY_VERSION = -1; + + /** + * An ACL that gives all permissions any user authenticated or not. + */ + public static final ImmutableList<ACL> OPEN_ACL_UNSAFE = + ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE); + + /** + * An ACL that gives all permissions to node creators and read permissions only to everyone else. + */ + public static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL = + ImmutableList.<ACL>builder() + .addAll(Ids.CREATOR_ALL_ACL) + .addAll(Ids.READ_ACL_UNSAFE) + .build(); + + /** + * Returns true if the given exception indicates an error that can be resolved by retrying the + * operation without modification. + * + * @param e the exception to check + * @return true if the causing operation is strictly retryable + */ + public static boolean isRetryable(KeeperException e) { + Preconditions.checkNotNull(e); + + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case SESSIONMOVED: + case OPERATIONTIMEOUT: + return true; + + case RUNTIMEINCONSISTENCY: + case DATAINCONSISTENCY: + case MARSHALLINGERROR: + case BADARGUMENTS: + case NONODE: + case NOAUTH: + case BADVERSION: + case NOCHILDRENFOREPHEMERALS: + case NODEEXISTS: + case NOTEMPTY: + case INVALIDCALLBACK: + case INVALIDACL: + case AUTHFAILED: + case UNIMPLEMENTED: + + // These two should not be encountered - they are used internally by ZK to specify ranges + case SYSTEMERROR: + case APIERROR: + + case OK: // This is actually an invalid ZK exception code + + default: + return false; + } + } + + /** + * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}. If the + * path already exists, nothing is done; however if any portion of the path is missing, it will be + * created with the given {@code acl} as a persistent zookeeper node. The given {@code path} must + * be a valid zookeeper absolute path. + * + * @param zkClient the client to use to access the ZK cluster + * @param acl the acl to use if creating path nodes + * @param path the path to ensure exists + * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster + * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster + * @throws KeeperException if there was a problem in ZK + */ + public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path) + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + Preconditions.checkNotNull(zkClient); + Preconditions.checkNotNull(path); + Preconditions.checkArgument(path.startsWith("/")); + + ensurePathInternal(zkClient, acl, path); + } + + private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path) + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + if (zkClient.get().exists(path, false) == null) { + // The current path does not exist; so back up a level and ensure the parent path exists + // unless we're already a root-level path. + int lastPathIndex = path.lastIndexOf('/'); + if (lastPathIndex > 0) { + ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex)); + } + + // We've ensured our parent path (if any) exists so we can proceed to create our path. + try { + zkClient.get().create(path, null, acl, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + // This ensures we don't die if a race condition was met between checking existence and + // trying to create the node. + LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?"); + } + } + } + + /** + * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and + * never ends with a slash (except for root path). + * + * @param path the path to be normalized + * @return normalized path string + */ + public static String normalizePath(String path) { + String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1"); + PathUtils.validatePath(normalizedPath); + return normalizedPath; + } + + private ZooKeeperUtils() { + // utility + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java new file mode 100644 index 0000000..ba09279 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java @@ -0,0 +1,140 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper.testing; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.Credentials; +import org.apache.aurora.common.zookeeper.ZooKeeperClient; + +/** + * A base-class for tests that interact with ZooKeeper via the commons ZooKeeperClient. + */ +public abstract class BaseZooKeeperClientTest extends BaseZooKeeperTest { + + private final Amount<Integer, Time> defaultSessionTimeout; + + /** + * Creates a test case where the test server uses its + * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit + * session timeout. + */ + public BaseZooKeeperClientTest() { + this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT); + } + + /** + * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for + * clients created without an explicit session timeout. + */ + public BaseZooKeeperClientTest(Amount<Integer, Time> defaultSessionTimeout) { + this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout); + } + + + /** + * Starts zookeeper back up on the last used port. + */ + protected final void restartNetwork() throws IOException, InterruptedException { + getServer().restartNetwork(); + } + + /** + * Shuts down the in-process zookeeper network server. + */ + protected final void shutdownNetwork() { + getServer().shutdownNetwork(); + } + + /** + * Expires the active session for the given client. The client should be one returned from + * {@link #createZkClient}. + * + * @param zkClient the client to expire + * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to + * the local zk server while trying to expire the session + * @throws InterruptedException if interrupted while requesting expiration + */ + protected final void expireSession(ZooKeeperClient zkClient) + throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException { + getServer().expireClientSession(zkClient.get().getSessionId()); + } + + /** + * Returns the current port to connect to the in-process zookeeper instance. + */ + protected final int getPort() { + return getServer().getPort(); + } + + /** + * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server + * with the default session timeout. + */ + protected final ZooKeeperClient createZkClient() { + return createZkClient(defaultSessionTimeout, Optional.absent(), Optional.absent()); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * the default session timeout. + */ + protected final ZooKeeperClient createZkClient(Credentials credentials) { + return createZkClient(defaultSessionTimeout, Optional.of(credentials), Optional.absent()); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * the default session timeout. The client is authenticated in the digest authentication scheme + * with the given {@code username} and {@code password}. + */ + protected final ZooKeeperClient createZkClient(String username, String password) { + return createZkClient(Credentials.digestCredentials(username, password)); + } + + /** + * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server + * with a custom {@code sessionTimeout}. + */ + protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) { + return createZkClient(sessionTimeout, Optional.absent(), Optional.absent()); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * the default session timeout and the custom chroot path. + */ + protected final ZooKeeperClient createZkClient(String chrootPath) { + return createZkClient(defaultSessionTimeout, Optional.absent(), + Optional.of(chrootPath)); + } + + private ZooKeeperClient createZkClient( + Amount<Integer, Time> sessionTimeout, + Optional<Credentials> credentials, + Optional<String> chrootPath) { + + ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, chrootPath, + ImmutableList.of(InetSocketAddress.createUnresolved("127.0.0.1", getPort()))); + addTearDown(client::close); + return client; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java new file mode 100644 index 0000000..0e68987 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java @@ -0,0 +1,46 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper.testing; + +import org.apache.aurora.common.testing.TearDownTestCase; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * A base-class for in-process zookeeper tests. + */ +public abstract class BaseZooKeeperTest extends TearDownTestCase { + + private ZooKeeperTestServer zkTestServer; + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Before + public final void setUp() throws Exception { + zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder()); + addTearDown(zkTestServer::stop); + zkTestServer.startNetwork(); + } + + /** + * Returns the running in-process ZooKeeper server. + * + * @return The in-process ZooKeeper server. + */ + protected final ZooKeeperTestServer getServer() { + return zkTestServer; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java new file mode 100644 index 0000000..50acaeb --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java @@ -0,0 +1,121 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper.testing; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +import com.google.common.base.Preconditions; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +/** + * A helper class for starting in-process ZooKeeper server and clients. + * + * <p>This is ONLY meant to be used for testing. + */ +public class ZooKeeperTestServer { + + static final Amount<Integer, Time> DEFAULT_SESSION_TIMEOUT = Amount.of(100, Time.MILLISECONDS); + + private final File dataDir; + private final File snapDir; + + private ZooKeeperServer zooKeeperServer; + private ServerCnxnFactory connectionFactory; + private int port; + + public ZooKeeperTestServer(File dataDir, File snapDir) { + this.dataDir = Preconditions.checkNotNull(dataDir); + this.snapDir = Preconditions.checkNotNull(snapDir); + } + + /** + * Starts zookeeper up on an ephemeral port. + */ + public void startNetwork() throws IOException, InterruptedException { + zooKeeperServer = + new ZooKeeperServer( + new FileTxnSnapLog(dataDir, snapDir), + new BasicDataTreeBuilder()) { + + // TODO(John Sirois): Introduce a builder to configure the in-process server if and when + // some folks need JMX for in-process tests. + @Override protected void registerJMX() { + // noop + } + }; + + connectionFactory = new NIOServerCnxnFactory(); + connectionFactory.configure( + new InetSocketAddress(port), + 60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */); + connectionFactory.startup(zooKeeperServer); + port = zooKeeperServer.getClientPort(); + } + + /** + * Stops the zookeeper server. + */ + public void stop() { + shutdownNetwork(); + } + + /** + * Starts zookeeper back up on the last used port. + */ + final void restartNetwork() throws IOException, InterruptedException { + checkEphemeralPortAssigned(); + Preconditions.checkState(connectionFactory == null); + startNetwork(); + } + + /** + * Shuts down the in-process zookeeper network server. + */ + final void shutdownNetwork() { + if (connectionFactory != null) { + connectionFactory.shutdown(); // Also shuts down zooKeeperServer. + connectionFactory = null; + } + } + + /** + * Expires the client session with the given {@code sessionId}. + * + * @param sessionId The id of the client session to expire. + */ + public final void expireClientSession(long sessionId) { + zooKeeperServer.closeSession(sessionId); + } + + /** + * Returns the current port to connect to the in-process zookeeper instance. + */ + public final int getPort() { + checkEphemeralPortAssigned(); + return port; + } + + private void checkEphemeralPortAssigned() { + Preconditions.checkState(port > 0, "startNetwork must be called first"); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java new file mode 100644 index 0000000..9c0cebe --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java @@ -0,0 +1,165 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class CandidateImplTest extends BaseZooKeeperClientTest { + private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; + private static final String SERVICE = "/twitter/services/puffin_linkhose/leader"; + private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES); + + private LinkedBlockingDeque<CandidateImpl> candidateBuffer; + + @Before + public void mySetUp() throws IOException { + candidateBuffer = new LinkedBlockingDeque<>(); + } + + private Group createGroup(ZooKeeperClient zkClient) throws IOException { + return new Group(zkClient, ACL, SERVICE); + } + + private class Reign implements Candidate.Leader { + private ExceptionalCommand<Group.JoinException> abdicate; + private final CandidateImpl candidate; + private final String id; + private CountDownLatch defeated = new CountDownLatch(1); + + Reign(String id, CandidateImpl candidate) { + this.id = id; + this.candidate = candidate; + } + + @Override + public void onElected(ExceptionalCommand<Group.JoinException> abdicate) { + candidateBuffer.offerFirst(candidate); + this.abdicate = abdicate; + } + + @Override + public void onDefeated() { + defeated.countDown(); + } + + public void abdicate() throws Group.JoinException { + Preconditions.checkState(abdicate != null); + abdicate.execute(); + } + + public void expectDefeated() throws InterruptedException { + defeated.await(); + } + + @Override + public String toString() { + return id; + } + } + + @Test + public void testOfferLeadership() throws Exception { + ZooKeeperClient zkClient1 = createZkClient(TIMEOUT); + final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) { + @Override public String toString() { + return "Leader1"; + } + }; + ZooKeeperClient zkClient2 = createZkClient(TIMEOUT); + final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) { + @Override public String toString() { + return "Leader2"; + } + }; + ZooKeeperClient zkClient3 = createZkClient(TIMEOUT); + final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) { + @Override public String toString() { + return "Leader3"; + } + }; + + Reign candidate1Reign = new Reign("1", candidate1); + Reign candidate2Reign = new Reign("2", candidate2); + Reign candidate3Reign = new Reign("3", candidate3); + + Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign); + Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign); + Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign); + + assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader", + candidate1Leader.get()); + + shutdownNetwork(); + restartNetwork(); + + assertTrue("A re-connect without a session expiration should leave the leader elected", + candidate1Leader.get()); + + candidate1Reign.abdicate(); + assertSame(candidate1, candidateBuffer.takeLast()); + assertFalse(candidate1Leader.get()); + // Active abdication should trigger defeat. + candidate1Reign.expectDefeated(); + + CandidateImpl secondCandidate = candidateBuffer.takeLast(); + assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " " + + candidateBuffer, + candidate2Leader.get() ^ candidate3Leader.get()); + + if (secondCandidate == candidate2) { + expireSession(zkClient2); + assertSame(candidate3, candidateBuffer.takeLast()); + assertTrue(candidate3Leader.get()); + // Passive expiration should trigger defeat. + candidate2Reign.expectDefeated(); + } else { + expireSession(zkClient3); + assertSame(candidate2, candidateBuffer.takeLast()); + assertTrue(candidate2Leader.get()); + // Passive expiration should trigger defeat. + candidate3Reign.expectDefeated(); + } + } + + @Test + public void testEmptyMembership() throws Exception { + ZooKeeperClient zkClient1 = createZkClient(TIMEOUT); + final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)); + Reign candidate1Reign = new Reign("1", candidate1); + + candidate1.offerLeadership(candidate1Reign); + assertSame(candidate1, candidateBuffer.takeLast()); + candidate1Reign.abdicate(); + assertFalse(candidate1.getLeaderData().isPresent()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java new file mode 100644 index 0000000..97a42d1 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java @@ -0,0 +1,321 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.zookeeper.Group.GroupChangeListener; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.zookeeper.Group.Membership; +import org.apache.aurora.common.zookeeper.Group.NodeScheme; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; +import org.apache.zookeeper.ZooDefs.Ids; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +public class GroupTest extends BaseZooKeeperClientTest { + + private ZooKeeperClient zkClient; + private Group joinGroup; + private Group watchGroup; + private Command stopWatching; + private Command onLoseMembership; + + private RecordingListener listener; + + public GroupTest() { + super(Amount.of(1, Time.DAYS)); + } + + @Before + public void mySetUp() throws Exception { + onLoseMembership = createMock(Command.class); + + zkClient = createZkClient("group", "test"); + joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group"); + watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group"); + + listener = new RecordingListener(); + stopWatching = watchGroup.watch(listener); + } + + private static class RecordingListener implements GroupChangeListener { + private final LinkedBlockingQueue<Iterable<String>> membershipChanges = + new LinkedBlockingQueue<Iterable<String>>(); + + @Override + public void onGroupChange(Iterable<String> memberIds) { + membershipChanges.add(memberIds); + } + + public Iterable<String> take() throws InterruptedException { + return membershipChanges.take(); + } + + public void assertEmpty() { + assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges)); + } + + @Override + public String toString() { + return membershipChanges.toString(); + } + } + + private static class CustomScheme implements NodeScheme { + static final String NODE_NAME = "custom_name"; + + @Override + public boolean isMember(String nodeName) { + return NODE_NAME.equals(nodeName); + } + + @Override + public String createName(byte[] membershipData) { + return NODE_NAME; + } + + @Override + public boolean isSequential() { + return false; + } + } + + @Test + public void testSessionExpirationTriggersOnLoseMembership() throws Exception { + final CountDownLatch lostMembership = new CountDownLatch(1); + Command onLoseMembership = lostMembership::countDown; + assertEmptyMembershipObserved(); + + Membership membership = joinGroup.join(onLoseMembership); + assertMembershipObserved(membership.getMemberId()); + expireSession(zkClient); + + lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated. + } + + @Test + public void testNodeDeleteTriggersOnLoseMembership() throws Exception { + final CountDownLatch lostMembership = new CountDownLatch(1); + Command onLoseMembership = lostMembership::countDown; + assertEmptyMembershipObserved(); + + Membership membership = joinGroup.join(onLoseMembership); + assertMembershipObserved(membership.getMemberId()); + membership.cancel(); + + lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated. + } + + @Test + public void testJoinsAndWatchesSurviveDisconnect() throws Exception { + replay(onLoseMembership); + + assertEmptyMembershipObserved(); + + Membership membership = joinGroup.join(); + String originalMemberId = membership.getMemberId(); + assertMembershipObserved(originalMemberId); + + shutdownNetwork(); + restartNetwork(); + + // The member should still be present under existing ephemeral node since session did not + // expire. + watchGroup.watch(listener); + assertMembershipObserved(originalMemberId); + + membership.cancel(); + + assertEmptyMembershipObserved(); + assertEmptyMembershipObserved(); // and again for 2nd listener + + listener.assertEmpty(); + + verify(onLoseMembership); + reset(onLoseMembership); // Turn off expectations during ZK server shutdown. + } + + @Test + public void testJoinsAndWatchesSurviveExpiredSession() throws Exception { + onLoseMembership.execute(); + replay(onLoseMembership); + + assertEmptyMembershipObserved(); + + Membership membership = joinGroup.join(onLoseMembership); + String originalMemberId = membership.getMemberId(); + assertMembershipObserved(originalMemberId); + + expireSession(zkClient); + + // We should have lost our group membership and then re-gained it with a new ephemeral node. + // We may or may-not see the intermediate state change but we must see the final state + Iterable<String> members = listener.take(); + if (Iterables.isEmpty(members)) { + members = listener.take(); + } + assertEquals(1, Iterables.size(members)); + assertNotEquals(originalMemberId, Iterables.getOnlyElement(members)); + assertNotEquals(originalMemberId, membership.getMemberId()); + + listener.assertEmpty(); + + verify(onLoseMembership); + reset(onLoseMembership); // Turn off expectations during ZK server shutdown. + } + + @Test + public void testJoinCustomNamingScheme() throws Exception { + Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group", + new CustomScheme()); + + listener = new RecordingListener(); + group.watch(listener); + assertEmptyMembershipObserved(); + + Membership membership = group.join(); + String memberId = membership.getMemberId(); + + assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId); + assertMembershipObserved(memberId); + + expireSession(zkClient); + } + + @Test + public void testUpdateMembershipData() throws Exception { + Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock(); + + byte[] initial = "start".getBytes(); + expect(dataSupplier.get()).andReturn(initial); + + byte[] second = "update".getBytes(); + expect(dataSupplier.get()).andReturn(second); + + replay(dataSupplier); + + Membership membership = joinGroup.join(dataSupplier, onLoseMembership); + assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get() + .getData(membership.getMemberPath(), false, null)); + + assertArrayEquals("Updating supplier should not change membership data", + initial, zkClient.get().getData(membership.getMemberPath(), false, null)); + + membership.updateMemberData(); + assertArrayEquals("Updating membership should change data", + second, zkClient.get().getData(membership.getMemberPath(), false, null)); + + verify(dataSupplier); + } + + @Test + public void testAcls() throws Exception { + Group securedMembership = + new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, + "/secured/group/membership"); + + String memberId = securedMembership.join().getMemberId(); + + Group unauthenticatedObserver = + new Group(createZkClient(), + Ids.READ_ACL_UNSAFE, + "/secured/group/membership"); + RecordingListener unauthenticatedListener = new RecordingListener(); + unauthenticatedObserver.watch(unauthenticatedListener); + + assertMembershipObserved(unauthenticatedListener, memberId); + + try { + unauthenticatedObserver.join(); + fail("Expected join exception for unauthenticated observer"); + } catch (JoinException e) { + // expected + } + + Group unauthorizedObserver = + new Group(createZkClient("joe", "schmoe"), + Ids.READ_ACL_UNSAFE, + "/secured/group/membership"); + RecordingListener unauthorizedListener = new RecordingListener(); + unauthorizedObserver.watch(unauthorizedListener); + + assertMembershipObserved(unauthorizedListener, memberId); + + try { + unauthorizedObserver.join(); + fail("Expected join exception for unauthorized observer"); + } catch (JoinException e) { + // expected + } + } + + @Test + public void testStopWatching() throws Exception { + replay(onLoseMembership); + + assertEmptyMembershipObserved(); + + Membership member1 = joinGroup.join(); + String memberId1 = member1.getMemberId(); + assertMembershipObserved(memberId1); + + Membership member2 = joinGroup.join(); + String memberId2 = member2.getMemberId(); + assertMembershipObserved(memberId1, memberId2); + + stopWatching.execute(); + + member1.cancel(); + Membership member3 = joinGroup.join(); + member2.cancel(); + member3.cancel(); + + listener.assertEmpty(); + } + + private void assertEmptyMembershipObserved() throws InterruptedException { + assertMembershipObserved(); + } + + private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException { + assertMembershipObserved(listener, expectedMemberIds); + } + + private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds) + throws InterruptedException { + + assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take())); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java new file mode 100644 index 0000000..2166123 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java @@ -0,0 +1,151 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import com.google.gson.JsonIOException; + +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(Gson.class) +public class JsonCodecTest { + + private static final Codec<ServiceInstance> STANDARD_JSON_CODEC = new JsonCodec(); + + @Test + public void testJsonCodecRoundtrip() throws Exception { + Codec<ServiceInstance> codec = STANDARD_JSON_CODEC; + ServiceInstance instance1 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http", new Endpoint("foo", 8080)), + Status.ALIVE) + .setShard(0); + byte[] data = ServerSets.serializeServiceInstance(instance1, codec); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + + ServiceInstance instance2 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), + Status.ALIVE); + data = ServerSets.serializeServiceInstance(instance2, codec); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + + ServiceInstance instance3 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.<String, Endpoint>of(), + Status.ALIVE); + data = ServerSets.serializeServiceInstance(instance3, codec); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + } + + @Test + public void testJsonCompatibility() throws IOException { + ServiceInstance instance = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http", new Endpoint("foo", 8080)), + Status.ALIVE).setShard(42); + + ByteArrayOutputStream results = new ByteArrayOutputStream(); + STANDARD_JSON_CODEC.serialize(instance, results); + assertEquals( + "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}," + + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}}," + + "\"status\":\"ALIVE\"," + + "\"shard\":42}", + results.toString()); + } + + @Test + public void testInvalidSerialize() { + // Gson is final so we need to call on PowerMock here. + Gson gson = PowerMock.createMock(Gson.class); + gson.toJson(EasyMock.isA(Object.class), EasyMock.isA(Appendable.class)); + EasyMock.expectLastCall().andThrow(new JsonIOException("error")); + PowerMock.replay(gson); + + ServiceInstance instance = + new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE); + + try { + new JsonCodec(gson).serialize(instance, new ByteArrayOutputStream()); + fail(); + } catch (IOException e) { + // Expected. + } + + PowerMock.verify(gson); + } + + @Test + public void testDeserializeMinimal() throws IOException { + String minimal = "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},\"status\":\"ALIVE\"}"; + ByteArrayInputStream source = new ByteArrayInputStream(minimal.getBytes(Charsets.UTF_8)); + ServiceInstance actual = STANDARD_JSON_CODEC.deserialize(source); + ServiceInstance expected = + new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE); + assertEquals(expected, actual); + } + + @Test + public void testInvalidDeserialize() { + // Not JSON. + assertInvalidDeserialize(new byte[] {0xC, 0xA, 0xF, 0xE}); + + // No JSON object. + assertInvalidDeserialize(""); + assertInvalidDeserialize("[]"); + + // Missing required fields. + assertInvalidDeserialize("{}"); + assertInvalidDeserialize("{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}}"); + assertInvalidDeserialize("{\"status\":\"ALIVE\"}"); + } + + private void assertInvalidDeserialize(String data) { + assertInvalidDeserialize(data.getBytes(Charsets.UTF_8)); + } + + private void assertInvalidDeserialize(byte[] data) { + try { + STANDARD_JSON_CODEC.deserialize(new ByteArrayInputStream(data)); + fail(); + } catch (IOException e) { + // Expected. + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java new file mode 100644 index 0000000..f0c0cb4 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java @@ -0,0 +1,258 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createControl; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * + * TODO(William Farner): Change this to remove thrift dependency. + */ +public class ServerSetImplTest extends BaseZooKeeperClientTest { + private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; + private static final String SERVICE = "/twitter/services/puffin_hosebird"; + + private LinkedBlockingQueue<ImmutableSet<ServiceInstance>> serverSetBuffer; + private DynamicHostSet.HostChangeMonitor<ServiceInstance> serverSetMonitor; + + @Before + public void mySetUp() throws IOException { + serverSetBuffer = new LinkedBlockingQueue<>(); + serverSetMonitor = serverSetBuffer::offer; + } + + private ServerSetImpl createServerSet() throws IOException { + return new ServerSetImpl(createZkClient(), ACL, SERVICE); + } + + @Test + public void testLifecycle() throws Exception { + ServerSetImpl client = createServerSet(); + client.watch(serverSetMonitor); + assertChangeFiredEmpty(); + + ServerSetImpl server = createServerSet(); + ServerSet.EndpointStatus status = server.join( + InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080)); + + ServiceInstance serviceInstance = new ServiceInstance( + new Endpoint("foo", 1234), + ImmutableMap.of("http-admin", new Endpoint("foo", 8080)), + Status.ALIVE); + + assertChangeFired(serviceInstance); + + status.leave(); + assertChangeFiredEmpty(); + assertTrue(serverSetBuffer.isEmpty()); + } + + @Test + public void testMembershipChanges() throws Exception { + ServerSetImpl client = createServerSet(); + client.watch(serverSetMonitor); + assertChangeFiredEmpty(); + + ServerSetImpl server = createServerSet(); + + ServerSet.EndpointStatus foo = join(server, "foo"); + assertChangeFired("foo"); + + expireSession(client.getZkClient()); + + ServerSet.EndpointStatus bar = join(server, "bar"); + + // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a + // change, just "foo", "bar" since this was an addition. + assertChangeFired("foo", "bar"); + + foo.leave(); + assertChangeFired("bar"); + + ServerSet.EndpointStatus baz = join(server, "baz"); + assertChangeFired("bar", "baz"); + + baz.leave(); + assertChangeFired("bar"); + + bar.leave(); + assertChangeFiredEmpty(); + + assertTrue(serverSetBuffer.isEmpty()); + } + + @Test + public void testStopMonitoring() throws Exception { + ServerSetImpl client = createServerSet(); + Command stopMonitoring = client.watch(serverSetMonitor); + assertChangeFiredEmpty(); + + ServerSetImpl server = createServerSet(); + + ServerSet.EndpointStatus foo = join(server, "foo"); + assertChangeFired("foo"); + ServerSet.EndpointStatus bar = join(server, "bar"); + assertChangeFired("foo", "bar"); + + stopMonitoring.execute(); + + // No new updates should be received since monitoring has stopped. + foo.leave(); + assertTrue(serverSetBuffer.isEmpty()); + + // Expiration event. + assertTrue(serverSetBuffer.isEmpty()); + } + + @Test + public void testOrdering() throws Exception { + ServerSetImpl client = createServerSet(); + client.watch(serverSetMonitor); + assertChangeFiredEmpty(); + + Map<String, InetSocketAddress> server1Ports = makePortMap("http-admin1", 8080); + Map<String, InetSocketAddress> server2Ports = makePortMap("http-admin2", 8081); + Map<String, InetSocketAddress> server3Ports = makePortMap("http-admin3", 8082); + + ServerSetImpl server1 = createServerSet(); + ServerSetImpl server2 = createServerSet(); + ServerSetImpl server3 = createServerSet(); + + ServiceInstance instance1 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), + Status.ALIVE); + ServiceInstance instance2 = new ServiceInstance( + new Endpoint("foo", 1001), + ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)), + Status.ALIVE); + ServiceInstance instance3 = new ServiceInstance( + new Endpoint("foo", 1002), + ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)), + Status.ALIVE); + + server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports); + assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take())); + + ServerSet.EndpointStatus status2 = server2.join( + InetSocketAddress.createUnresolved("foo", 1001), + server2Ports); + assertEquals(ImmutableList.of(instance1, instance2), + ImmutableList.copyOf(serverSetBuffer.take())); + + server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports); + assertEquals(ImmutableList.of(instance1, instance2, instance3), + ImmutableList.copyOf(serverSetBuffer.take())); + + status2.leave(); + assertEquals(ImmutableList.of(instance1, instance3), + ImmutableList.copyOf(serverSetBuffer.take())); + } + + @Test + public void testUnwatchOnException() throws Exception { + IMocksControl control = createControl(); + + ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class); + Watcher onExpirationWatcher = control.createMock(Watcher.class); + + expect(zkClient.registerExpirationHandler(anyObject(Command.class))) + .andReturn(onExpirationWatcher); + + expect(zkClient.get()).andThrow(new InterruptedException()); // See interrupted() note below. + expect(zkClient.unregister(onExpirationWatcher)).andReturn(true); + control.replay(); + + Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla"); + ServerSetImpl serverset = new ServerSetImpl(zkClient, group); + + try { + serverset.watch(hostSet -> {}); + fail("Expected MonitorException"); + } catch (DynamicHostSet.MonitorException e) { + // NB: The assert is not important to this test, but the call to `Thread.interrupted()` is. + // That call both returns the current interrupted status as well as clearing it. The clearing + // is crucial depending on the order tests are run in this class. If this test runs before + // one of the tests above that uses a `ZooKeeperClient` for example, those tests will fail + // executing `ZooKeeperClient.get` which internally blocks on s sync-point that takes part in + // the interruption mechanism and so immediately throws `InterruptedException` based on the + // un-cleared interrupted bit. + assertTrue(Thread.interrupted()); + } + control.verify(); + } + + private static Map<String, InetSocketAddress> makePortMap(String name, int port) { + return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port)); + } + + private ServerSet.EndpointStatus join(ServerSet serverSet, String host) + throws JoinException, InterruptedException { + + return serverSet.join( + InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of()); + } + + private void assertChangeFired(String... serviceHosts) + throws InterruptedException { + + assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts), + serviceHost -> new ServiceInstance(new Endpoint(serviceHost, 42), + ImmutableMap.<String, Endpoint>of(), Status.ALIVE)))); + } + + protected void assertChangeFiredEmpty() throws InterruptedException { + assertChangeFired(ImmutableSet.<ServiceInstance>of()); + } + + protected void assertChangeFired(ServiceInstance... serviceInstances) + throws InterruptedException { + assertChangeFired(ImmutableSet.copyOf(serviceInstances)); + } + + protected void assertChangeFired(ImmutableSet<ServiceInstance> serviceInstances) + throws InterruptedException { + assertEquals(serviceInstances, serverSetBuffer.take()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java new file mode 100644 index 0000000..0e67191 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java @@ -0,0 +1,44 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; + +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ServerSetsTest { + @Test + public void testSimpleSerialization() throws Exception { + InetSocketAddress endpoint = new InetSocketAddress(12345); + Map<String, Endpoint > additionalEndpoints = ImmutableMap.of(); + Status status = Status.ALIVE; + + byte[] data = ServerSets.serializeServiceInstance( + endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC); + + ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC); + + assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort()); + assertEquals(additionalEndpoints, instance.getAdditionalEndpoints()); + assertEquals(Status.ALIVE, instance.getStatus()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java new file mode 100644 index 0000000..5f6cdd8 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java @@ -0,0 +1,243 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.zookeeper.Candidate.Leader; +import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; +import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; +import org.easymock.Capture; +import org.easymock.IExpectationSetters; +import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createControl; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.fail; + +public class SingletonServiceImplTest extends BaseZooKeeperClientTest { + private static final int PORT_A = 1234; + private static final int PORT_B = 8080; + private static final InetSocketAddress PRIMARY_ENDPOINT = + InetSocketAddress.createUnresolved("foo", PORT_A); + private static final Map<String, InetSocketAddress> AUX_ENDPOINTS = + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B)); + + private IMocksControl control; + private SingletonServiceImpl.LeadershipListener listener; + private ServerSet serverSet; + private ServerSet.EndpointStatus endpointStatus; + private Candidate candidate; + private ExceptionalCommand<Group.JoinException> abdicate; + + private SingletonService service; + + @Before + @SuppressWarnings("unchecked") + public void mySetUp() throws IOException { + control = createControl(); + addTearDown(control::verify); + listener = control.createMock(SingletonServiceImpl.LeadershipListener.class); + serverSet = control.createMock(ServerSet.class); + candidate = control.createMock(Candidate.class); + endpointStatus = control.createMock(ServerSet.EndpointStatus.class); + abdicate = control.createMock(ExceptionalCommand.class); + + service = new SingletonServiceImpl(serverSet, candidate); + } + + private void newLeader( + final String hostName, + Capture<Leader> leader, + LeadershipListener listener) throws Exception { + + service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A), + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)), + listener); + + // This actually elects the leader. + leader.getValue().onElected(abdicate); + } + + private void newLeader(String hostName, Capture<Leader> leader) throws Exception { + newLeader(hostName, leader, listener); + } + + private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception { + return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS)); + } + + @Test + public void testLeadAdvertise() throws Exception { + Capture<Leader> leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andReturn(endpointStatus); + endpointStatus.leave(); + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().advertise(); + controlCapture.getValue().leave(); + } + + @Test + public void teatLeadLeaveNoAdvertise() throws Exception { + Capture<Leader> leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + abdicate.execute(); + + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().leave(); + } + + @Test + public void testLeadJoinFailure() throws Exception { + Capture<Leader> leaderCapture = new Capture<Leader>(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception())); + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + + try { + controlCapture.getValue().advertise(); + fail("Join should have failed."); + } catch (SingletonService.AdvertiseException e) { + // Expected. + } + + controlCapture.getValue().leave(); + } + + @Test(expected = IllegalStateException.class) + public void testMultipleAdvertise() throws Exception { + Capture<Leader> leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andReturn(endpointStatus); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().advertise(); + controlCapture.getValue().advertise(); + } + + @Test(expected = IllegalStateException.class) + public void testMultipleLeave() throws Exception { + Capture<Leader> leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andReturn(endpointStatus); + endpointStatus.leave(); + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().advertise(); + controlCapture.getValue().leave(); + controlCapture.getValue().leave(); + } + + @Test(expected = IllegalStateException.class) + public void testAdvertiseAfterLeave() throws Exception { + Capture<Leader> leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture<LeaderControl> controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().leave(); + controlCapture.getValue().advertise(); + } + + @Test + public void testLeadMulti() throws Exception { + List<Capture<Leader>> leaderCaptures = Lists.newArrayList(); + List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList(); + + for (int i = 0; i < 5; i++) { + Capture<Leader> leaderCapture = new Capture<Leader>(); + leaderCaptures.add(leaderCapture); + Capture<LeaderControl> controlCapture = createCapture(); + leaderControlCaptures.add(controlCapture); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + listener.onLeading(capture(controlCapture)); + InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A); + Map<String, InetSocketAddress> aux = + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B)); + expect(serverSet.join(primary, aux)).andReturn(endpointStatus); + endpointStatus.leave(); + abdicate.execute(); + } + + control.replay(); + + for (int i = 0; i < 5; i++) { + final String leaderName = "foo" + i; + newLeader(leaderName, leaderCaptures.get(i)); + leaderControlCaptures.get(i).getValue().advertise(); + leaderControlCaptures.get(i).getValue().leave(); + } + } + + @Test + public void testLeaderLeaves() throws Exception { + control.replay(); + shutdownNetwork(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java new file mode 100644 index 0000000..5eee235 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java @@ -0,0 +1,210 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.NoAuthException; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author John Sirois + */ +public class ZooKeeperClientTest extends BaseZooKeeperClientTest { + + public ZooKeeperClientTest() { + super(Amount.of(1, Time.DAYS)); + } + + @Test + public void testGet() throws Exception { + final ZooKeeperClient zkClient = createZkClient(); + shutdownNetwork(); + try { + zkClient.get(Amount.of(50L, Time.MILLISECONDS)); + fail("Expected client connection to timeout while network down"); + } catch (TimeoutException e) { + assertTrue(zkClient.isClosed()); + } + assertNull(zkClient.getZooKeeperClientForTests()); + + final CountDownLatch blockingGetComplete = new CountDownLatch(1); + final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>(); + new Thread(() -> { + try { + client.set(zkClient.get()); + } catch (ZooKeeperConnectionException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + blockingGetComplete.countDown(); + } + }).start(); + + restartNetwork(); + + // Hung blocking connects should succeed when server connection comes up + blockingGetComplete.await(); + assertNotNull(client.get()); + + // New connections should succeed now that network is back up + long sessionId = zkClient.get().getSessionId(); + + // While connected the same client should be reused (no new connections while healthy) + assertSame(client.get(), zkClient.get()); + + shutdownNetwork(); + // Our client doesn't know the network is down yet so we should be able to get() + ZooKeeper zooKeeper = zkClient.get(); + try { + zooKeeper.exists("/", false); + fail("Expected client operation to fail while network down"); + } catch (ConnectionLossException e) { + // expected + } + + restartNetwork(); + assertEquals("Expected connection to be re-established with existing session", + sessionId, zkClient.get().getSessionId()); + } + + /** + * Test that if a blocking get() call gets interrupted, after a connection has been created + * but before it's connected, the zk connection gets closed. + */ + @Test + public void testGetInterrupted() throws Exception { + final ZooKeeperClient zkClient = createZkClient(); + shutdownNetwork(); + + final CountDownLatch blockingGetComplete = new CountDownLatch(1); + final AtomicBoolean interrupted = new AtomicBoolean(); + final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>(); + Thread getThread = new Thread(() -> { + try { + client.set(zkClient.get()); + } catch (ZooKeeperConnectionException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + interrupted.set(true); + throw new RuntimeException(e); + } finally { + blockingGetComplete.countDown(); + } + }); + getThread.start(); + + while (zkClient.getZooKeeperClientForTests() == null) { + Thread.sleep(100); + } + + getThread.interrupt(); + blockingGetComplete.await(); + + assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests()); + assertTrue("The waiter thread should have been interrupted", interrupted.get()); + assertTrue(zkClient.isClosed()); + } + + @Test + public void testClose() throws Exception { + ZooKeeperClient zkClient = createZkClient(); + zkClient.close(); + + // Close should be idempotent + zkClient.close(); + + long firstSessionId = zkClient.get().getSessionId(); + + // Close on an open client should force session re-establishment + zkClient.close(); + + assertNotEquals(firstSessionId, zkClient.get().getSessionId()); + } + + @Test + public void testCredentials() throws Exception { + String path = "/test"; + ZooKeeperClient authenticatedClient = createZkClient("creator", "creator"); + assertEquals(path, + authenticatedClient.get().create(path, "42".getBytes(), + ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT)); + + ZooKeeperClient unauthenticatedClient = createZkClient(); + assertEquals("42", getData(unauthenticatedClient, path)); + try { + setData(unauthenticatedClient, path, "37"); + fail("Expected unauthenticated write attempt to fail"); + } catch (NoAuthException e) { + assertEquals("42", getData(unauthenticatedClient, path)); + } + + ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner"); + assertEquals("42", getData(nonOwnerClient, path)); + try { + setData(nonOwnerClient, path, "37"); + fail("Expected non owner write attempt to fail"); + } catch (NoAuthException e) { + assertEquals("42", getData(nonOwnerClient, path)); + } + + ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator"); + setData(authenticatedClient2, path, "37"); + assertEquals("37", getData(authenticatedClient2, path)); + } + + @Test + public void testChrootPath() throws Exception { + ZooKeeperClient rootClient = createZkClient(); + String rootPath = "/test"; + String subPath = "/test/subtest"; + assertEquals(rootPath, + rootClient.get().create(rootPath, "42".getBytes(), + ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + assertEquals(subPath, + rootClient.get().create(subPath, "37".getBytes(), + ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + + ZooKeeperClient chrootedClient = createZkClient(rootPath); + assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null)); + } + + private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception { + zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION); + } + + private String getData(ZooKeeperClient zkClient, String path) throws Exception { + return new String(zkClient.get().getData(path, false, null)); + } +}