On Tue, Nov 9, 2010 at 12:34 PM, Jeremy Hanna <jeremy.hanna1...@gmail.com>wrote:

> Anyone know of a good blog post or docs anywhere that gives a simple
> example of Watchers in action?  I saw the one on:
>
> http://hadoop.apache.org/zookeeper/docs/current/javaExample.html#ch_Introduction
>
> but it seems kind of overly complicated for an intro to Watchers.  I
> appreciate the example but wondered if there were other examples out there.
>

Appended is a Java example of using a Watcher simply to wait for the client
to actually be connected to a server.  I used it when I was confirming to my
satisfaction that there was a bug in the ZooKeeper recipe for WriteLock
awhile ago.  I think this use is slightly unusual in that it is more
interested in KeeperState than the event type.  A more conventional Watcher
might be like the following sketch (uhm, this is Groovy), though really
you'd have to look at both:

@Override
public void process(WatchedEvent event) {
switch (event?.getType()) {
case EventType.NodeDeleted:
// TODO: what should we do if the node being watched is itself
// deleted?
LOG.error("The node being watched '" + event.getPath + "' has been deleted:
that's not good")
break
case EventType.NodeChildrenChanged:
childrenChanged(event)
break
default:
LOG.debug("Ignoring event type '" + event?.getType() + "'")
break
}
}

-- 
Robert Crocombe

package derp;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.recipes.lock.WriteLock;

public class Test {
private static final Log LOG = LogFactory.getLog(Test.class);

private static final String ZOO_CONFIG = "10.2.1.54:2181/test";
private static final String LOCK_DIR = "/locking-test";
private static final int TIMEOUT_MILLIS = 10000;

private static class ConnectWatcher implements Watcher {

private final Lock connectedLock = new ReentrantLock();
private final Condition connectedCondition = connectedLock.newCondition();

private final AtomicBoolean connected = new AtomicBoolean(false);

@Override
public void process(WatchedEvent event) {
LOG.debug("Event: " + event);

KeeperState keeperState = event.getState();
switch (keeperState) {
case SyncConnected:
if (!connected.get()) {
connected.set(true);
signal();
}
break;
case Expired:
case Disconnected:
if (connected.get()) {
connected.set(false);
signal();
}
}
}

public void waitForConnection() throws InterruptedException {
connectedLock.lock();
try {
while (!connected.get()) {
LOG.debug("Waiting for condition to be signalled");
connectedCondition.await();
LOG.debug("Woken up on condition signalled");
}
} finally {
connectedLock.unlock();
}
LOG.debug("After signalling, we are connected");
}

@Override
public String toString() {
StringBuilder b = new StringBuilder("[");
b.append("connectedLock:").append(connectedLock);
b.append(",connectedCondition:").append(connectedCondition);
b.append(",connected:").append(connected);
b.append("]");
return b.toString();
}

private void signal() {
LOG.debug("Signaling after event");
connectedLock.lock();
try {
connectedCondition.signal();
} finally {
connectedLock.unlock();
}
}
}

private static final void fine(ZooKeeper lowerId, ZooKeeper higherId) throws
KeeperException,
InterruptedException {
WriteLock lower = new WriteLock(lowerId, LOCK_DIR, null);
WriteLock higher = new WriteLock(higherId, LOCK_DIR, null);

boolean lowerAcquired = lower.lock();
assert lowerAcquired;

LOG.debug("Lower acquired lock successfully, so higher should fail");

boolean higherAcquired = higher.lock();
assert !higherAcquired;

LOG.debug("Correct: higher session fails to acquire lock");

lower.unlock();
// Now that lower has unlocked, higher will acquire. Really should use
// the version of WriteLock with the LockListener, but a short sleep
// should do.
Thread.sleep(2000);
higher.unlock();
// make sure we let go.
assert !higher.isOwner();
}

/*
 * Using recipes from ZooKeeper 3.2.1.
 *
 * This bug occurs because the sort in ZooKeeperLockOperation.execute
 * (beginning @ line 221) orders the paths, but the paths contain the
 * session ID (lines 206-207), so that sorting the paths places all paths
 * with a lower session ID before those with a higher, i.e. the sorting is
 * not just by sequence number. I think this is a bug.
 *
 * The result is that a lock acquisition by a WriteLock coming from a
 * ZooKeeper with a high session ID will not be seen by another attempt to
 * lock coming from ZooKeeper with a lower session ID, because the sorting
 * will make the assumption:
 *
 * ownerId = sortedNames.first().getName();
 *
 * line 226 false, and also the test
 *
 * if (!lessThanMe.isEmpty()) {
 *
 * line 228 will not reveal the presence of the other node that is already
 * locked.
 */
private static final void bug(ZooKeeper lowerId, ZooKeeper higherId) throws
KeeperException,
InterruptedException {
WriteLock lower = new WriteLock(lowerId, LOCK_DIR, null);
WriteLock higher = new WriteLock(higherId, LOCK_DIR, null);

boolean higherAcquired = higher.lock();
assert higherAcquired;
boolean lowerAcquired = false;
try {
LOG.debug("Higher acquired lock successfully, so lower should fail");
LOG.debug("Is higher owner (should be): " + higher.isOwner());

lowerAcquired = lower.lock();

LOG.debug("Is lower lock owner? " + lower.isOwner());

if (!lowerAcquired) {
LOG.info("Okay: bug not triggered: lower lock did not acquire");
} else {
LOG
.error("BUG!  Even though the higher had the lock, the lower managed to
acquire it as well!");
}
} finally {
if (lowerAcquired)
lower.unlock();
if (higherAcquired)
higher.unlock();
}
assert !lower.isOwner();
assert !higher.isOwner();
}

public static void main(String[] args) throws IOException, KeeperException,
InterruptedException {
// Two ZooKeepers so we get two sessions: the bug only shows up for
// two different session IDs.
ConnectWatcher connectA = new ConnectWatcher();
ConnectWatcher connectB = new ConnectWatcher();

ZooKeeper zooA = new ZooKeeper(ZOO_CONFIG, TIMEOUT_MILLIS, connectA);
ZooKeeper zooB = new ZooKeeper(ZOO_CONFIG, TIMEOUT_MILLIS, connectB);

connectA.waitForConnection();
connectB.waitForConnection();
LOG.debug("Both connections connected");

final long idA = zooA.getSessionId();
final long idB = zooB.getSessionId();
LOG.debug("For A -> " + zooA + " session ID is " + idA + " -> " +
Long.toHexString(idA));
LOG.debug("For B -> " + zooB + " session ID is " + idB + " -> " +
Long.toHexString(idB));

// verify different sessions
assert idA != idB;

try {
if (idA < idB) {
fine(zooA, zooB);
bug(zooA, zooB);
} else {
fine(zooB, zooA);
bug(zooB, zooA);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
LOG.debug("Closing session A -> " + Long.toHexString(idA));
zooA.close();
} catch (Exception e) {
}
try {
LOG.debug("Closing session A -> " + Long.toHexString(idB));
zooB.close();
} catch (Exception e) {
}
}
}
}

Reply via email to