Thanks for posting Robert! On Nov 11, 2010, at 2:46 PM, Robert Crocombe wrote:
> On Tue, Nov 9, 2010 at 12:34 PM, Jeremy Hanna > <jeremy.hanna1...@gmail.com>wrote: > >> Anyone know of a good blog post or docs anywhere that gives a simple >> example of Watchers in action? I saw the one on: >> >> http://hadoop.apache.org/zookeeper/docs/current/javaExample.html#ch_Introduction >> >> but it seems kind of overly complicated for an intro to Watchers. I >> appreciate the example but wondered if there were other examples out there. >> > > Appended is a Java example of using a Watcher simply to wait for the client > to actually be connected to a server. I used it when I was confirming to my > satisfaction that there was a bug in the ZooKeeper recipe for WriteLock > awhile ago. I think this use is slightly unusual in that it is more > interested in KeeperState than the event type. A more conventional Watcher > might be like the following sketch (uhm, this is Groovy), though really > you'd have to look at both: > > @Override > public void process(WatchedEvent event) { > switch (event?.getType()) { > case EventType.NodeDeleted: > // TODO: what should we do if the node being watched is itself > // deleted? > LOG.error("The node being watched '" + event.getPath + "' has been deleted: > that's not good") > break > case EventType.NodeChildrenChanged: > childrenChanged(event) > break > default: > LOG.debug("Ignoring event type '" + event?.getType() + "'") > break > } > } > > -- > Robert Crocombe > > package derp; > > import java.io.IOException; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.locks.Condition; > import java.util.concurrent.locks.Lock; > import java.util.concurrent.locks.ReentrantLock; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.zookeeper.KeeperException; > import org.apache.zookeeper.WatchedEvent; > import org.apache.zookeeper.Watcher; > import org.apache.zookeeper.ZooKeeper; > import org.apache.zookeeper.Watcher.Event.KeeperState; > import org.apache.zookeeper.recipes.lock.WriteLock; > > public class Test { > private static final Log LOG = LogFactory.getLog(Test.class); > > private static final String ZOO_CONFIG = "10.2.1.54:2181/test"; > private static final String LOCK_DIR = "/locking-test"; > private static final int TIMEOUT_MILLIS = 10000; > > private static class ConnectWatcher implements Watcher { > > private final Lock connectedLock = new ReentrantLock(); > private final Condition connectedCondition = connectedLock.newCondition(); > > private final AtomicBoolean connected = new AtomicBoolean(false); > > @Override > public void process(WatchedEvent event) { > LOG.debug("Event: " + event); > > KeeperState keeperState = event.getState(); > switch (keeperState) { > case SyncConnected: > if (!connected.get()) { > connected.set(true); > signal(); > } > break; > case Expired: > case Disconnected: > if (connected.get()) { > connected.set(false); > signal(); > } > } > } > > public void waitForConnection() throws InterruptedException { > connectedLock.lock(); > try { > while (!connected.get()) { > LOG.debug("Waiting for condition to be signalled"); > connectedCondition.await(); > LOG.debug("Woken up on condition signalled"); > } > } finally { > connectedLock.unlock(); > } > LOG.debug("After signalling, we are connected"); > } > > @Override > public String toString() { > StringBuilder b = new StringBuilder("["); > b.append("connectedLock:").append(connectedLock); > b.append(",connectedCondition:").append(connectedCondition); > b.append(",connected:").append(connected); > b.append("]"); > return b.toString(); > } > > private void signal() { > LOG.debug("Signaling after event"); > connectedLock.lock(); > try { > connectedCondition.signal(); > } finally { > connectedLock.unlock(); > } > } > } > > private static final void fine(ZooKeeper lowerId, ZooKeeper higherId) throws > KeeperException, > InterruptedException { > WriteLock lower = new WriteLock(lowerId, LOCK_DIR, null); > WriteLock higher = new WriteLock(higherId, LOCK_DIR, null); > > boolean lowerAcquired = lower.lock(); > assert lowerAcquired; > > LOG.debug("Lower acquired lock successfully, so higher should fail"); > > boolean higherAcquired = higher.lock(); > assert !higherAcquired; > > LOG.debug("Correct: higher session fails to acquire lock"); > > lower.unlock(); > // Now that lower has unlocked, higher will acquire. Really should use > // the version of WriteLock with the LockListener, but a short sleep > // should do. > Thread.sleep(2000); > higher.unlock(); > // make sure we let go. > assert !higher.isOwner(); > } > > /* > * Using recipes from ZooKeeper 3.2.1. > * > * This bug occurs because the sort in ZooKeeperLockOperation.execute > * (beginning @ line 221) orders the paths, but the paths contain the > * session ID (lines 206-207), so that sorting the paths places all paths > * with a lower session ID before those with a higher, i.e. the sorting is > * not just by sequence number. I think this is a bug. > * > * The result is that a lock acquisition by a WriteLock coming from a > * ZooKeeper with a high session ID will not be seen by another attempt to > * lock coming from ZooKeeper with a lower session ID, because the sorting > * will make the assumption: > * > * ownerId = sortedNames.first().getName(); > * > * line 226 false, and also the test > * > * if (!lessThanMe.isEmpty()) { > * > * line 228 will not reveal the presence of the other node that is already > * locked. > */ > private static final void bug(ZooKeeper lowerId, ZooKeeper higherId) throws > KeeperException, > InterruptedException { > WriteLock lower = new WriteLock(lowerId, LOCK_DIR, null); > WriteLock higher = new WriteLock(higherId, LOCK_DIR, null); > > boolean higherAcquired = higher.lock(); > assert higherAcquired; > boolean lowerAcquired = false; > try { > LOG.debug("Higher acquired lock successfully, so lower should fail"); > LOG.debug("Is higher owner (should be): " + higher.isOwner()); > > lowerAcquired = lower.lock(); > > LOG.debug("Is lower lock owner? " + lower.isOwner()); > > if (!lowerAcquired) { > LOG.info("Okay: bug not triggered: lower lock did not acquire"); > } else { > LOG > .error("BUG! Even though the higher had the lock, the lower managed to > acquire it as well!"); > } > } finally { > if (lowerAcquired) > lower.unlock(); > if (higherAcquired) > higher.unlock(); > } > assert !lower.isOwner(); > assert !higher.isOwner(); > } > > public static void main(String[] args) throws IOException, KeeperException, > InterruptedException { > // Two ZooKeepers so we get two sessions: the bug only shows up for > // two different session IDs. > ConnectWatcher connectA = new ConnectWatcher(); > ConnectWatcher connectB = new ConnectWatcher(); > > ZooKeeper zooA = new ZooKeeper(ZOO_CONFIG, TIMEOUT_MILLIS, connectA); > ZooKeeper zooB = new ZooKeeper(ZOO_CONFIG, TIMEOUT_MILLIS, connectB); > > connectA.waitForConnection(); > connectB.waitForConnection(); > LOG.debug("Both connections connected"); > > final long idA = zooA.getSessionId(); > final long idB = zooB.getSessionId(); > LOG.debug("For A -> " + zooA + " session ID is " + idA + " -> " + > Long.toHexString(idA)); > LOG.debug("For B -> " + zooB + " session ID is " + idB + " -> " + > Long.toHexString(idB)); > > // verify different sessions > assert idA != idB; > > try { > if (idA < idB) { > fine(zooA, zooB); > bug(zooA, zooB); > } else { > fine(zooB, zooA); > bug(zooB, zooA); > } > } catch (Exception e) { > e.printStackTrace(); > } finally { > try { > LOG.debug("Closing session A -> " + Long.toHexString(idA)); > zooA.close(); > } catch (Exception e) { > } > try { > LOG.debug("Closing session A -> " + Long.toHexString(idB)); > zooB.close(); > } catch (Exception e) { > } > } > } > }