Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change 
notification.

The following page has been changed by PatrickHunt:
http://wiki.apache.org/hadoop/ZooKeeper/ZooKeeperRecipes

New page:
= ZooKeeper Recipes =

Although ZooKeeper uses asynchronous notifications it can be used to build 
synchronous consistency primitives such as queues and locks. This is possible 
because ZooKeeper imposes a total order on updates and has mechanisms for 
exposing this ordering. To implement these primitives efficiently we cannot use 
polling, timers, or anything that would generate the "herd effect" since these 
cause large bursts of traffic and limit scalability.

Name service and configuration are two of the primary applications of 
ZooKeeper. These two functions are provided directly by the ZooKeeper API. 
Another function directly provided by ZooKeeper is group membership. The group 
is represented by a node. Members of the group create ephemeral nodes under the 
group node. Nodes of the members that fail abnormally will be removed 
automatically when ZooKeeper detects the failure.

In the following section we describe conventions that can be used with 
ZooKeeper it implement higher order functions. All of them are conventions 
implemented at the client and do not require special support from ZooKeeper. We 
imagine that eventually these conventions will be captured in client side 
libraries to ease use and encourage standard application of conventions. These 
examples are to stimulate thought. There are many more functions that can be 
imagined, for example revocable read write priority locks.

Some of these constructs,locks particularly, are here for illustrative 
purposes. In general applications usually find more directly applicable 
constructs, such as event handles or queues, to use in their applications.

== Event Handle ==

Distributed systems use Barriers to block processing of a set of nodes until a 
condition is met at which time all the nodes are allowed to proceed. Barriers 
are implemented in ZooKeeper by designating a barrier node. The barrier is in 
place if the barrier node exists. Distributed machines call exists() on the 
barrier node with watch set to true. If exists() returns false, the barrier is 
gone and the machines proceed with proceed. Otherwise, if exists() returns 
true, the machines wait for a watch event from ZooKeeper for the barrier node. 
When the watch event is triggered, the machines will reissue the exists() again 
waiting until the barrier node is removed.

== Queue ==

Distributed queues are a rather common data structure. We first designate a 
ZooKeeper node to hold the queue, the queue node. A distributed clients puts 
something into the queue by calling create() with a pathname ending in "queue-" 
and the sequence and ephemeral flags set to true. Because the sequence flag is 
set the new pathnames will have the form _path-to-queue-node_/queue-XXXX, where 
XXXX is a monotonic increasing number. A client that want to remove from the 
queue does a getChildren with watch set to true on the queue node and starts 
processing nodes with the lowest number. The client does not need to issue 
another getChildren until he exhausts the list obtained from the first 
getChildren. If there are are no children in the queue node, the reader waits 
for a watch notification to check to queue again.

== Priority Queue ==

To implement a priority queue we need to make two simple changes to the queuing 
protocol described above. First, to add to a queue the pathname ends with 
"queue-YY" where YY is the priority of the element with lower numbers 
representing higher priority (just like UNIX). Second, when removing from the 
queue a client uses an up-to-date children list meaning that the client will 
invalidate previously obtained children lists if a watch notification triggers 
for the queue node.

== Locks ==

Fully distributed locks that are globally synchronous, meaning at any snapshot 
in type no two clients think they hold the same lock can be implemented using 
ZooKeeeper. As with priorities we define a lock node.

Clients wishing to obtain lock do the following:

 1. call create() with a pathname of "_locknode_/lock-" and the sequence and 
ephemeral flags set.
 1. do getChildren on the lock node WITHOUT setting the watch flag (this is 
important to avoid the herd effect).
 1. if the pathname created in step 1 has the lowest sequence number suffix, 
the client has the lock and the client exits the protocol.
 1. the client does an exists() with the watch flag set on the path in the lock 
directory with the next lowest sequence number.
 1. if exists() returns false goto step 2. Otherwise, wait for a notification 
for the pathname from the previous step before going to step 2.

Clients wishing to release a lock simply delete the node they created in step 1.

There are a couple of observations to make:

 1. The removal of a node will only cause one client to wake up since each node 
is watched by exactly one client, so we do not have the herd effect.
 1. There is no polling or timeouts.
 1. Because of the way we have implemented locking it is easy to see the amount 
of lock contention, break locks, debug locking problems, etc.

Since ZooKeeper is often compared to Chubby it is interesting to see how we can 
implement Chubby in Zoo Keeper: //(Note: this is written using only the 
published information on Chubby. There are subtleties that are not taken into 
account, so until Google tests and says otherwise, this is more a rough 
comparison rather than a drop in replacement.)//

 * Open(), Close(), Poison() - These are NOOPs in ZooKeeper since we don't use 
file handles.
 * GetContentsAndStat() = getData().
 * GetStat() = exists().
 * ReadDir() = getChildren().
 * SetContents() = setData().
 * Delete() = delete().
 * Acquire() - This is the obtain lock protocol outlined above.
 * TryAcquire() - This is the obtain lock protocol that replaces step 4 with 
"failed to obtain lock, delete the node created in step 1 and exit".
 * Release() - This is the release lock protocol.
 * GetSequencer() - This is the pathname and version (0) of the lock file 
created in step 1 of the obtain lock protocol.
 * SetSequencer() - The client calls exists() on the pathname of the sequencer 
with watch set and invalidates the "sequencer" if the version has changed or if 
it receives a notification and the version changes.
 * CheckSequencer - exists() on the pathname of the sequencer and ensure the 
node exists and the version has not changed.

Although you could implement a Chubby client using ZooKeeper, more powerful 
APIs could also be implemented. Read on!

=== Shared Locks ===

To implement shared locks we change the lock protocol slightly:

To obtain a read lock:
 1. call create() with a pathname of "_locknode_/read-" and the sequence and 
ephemeral flags set.
 1. do getChildren on the lock node WITHOUT setting the watch flag (this is 
important to avoid the herd effect).
 1. if there are no children with pathname that start with "write-" and have a 
lower sequence number than the node created in step 1, the client has the lock 
and the client exits the protocol.
 1. the client does an exists() with the watch flag set on node with the 
pathname that starts with "write-" in the lock directory and has the next 
lowest sequence number.
 1. if exists() returns false, goto step 2. Otherwise, wait for a notification 
for the pathname from the previous step before going to step 2.

To obtain a write lock:
 1. call create() with a pathname of "_locknode_/write-" and the sequence and 
ephemeral flags set.
 1. do getChildren on the lock node WITHOUT setting the watch flag (this is 
important to avoid the herd effect).
 1. if there are no children with a lower sequence number than the node created 
in step 1, the client has the lock and the client exits the protocol.
 1. the client does an exists() with the watch flag set on node with the 
pathname that has the next lowest sequence number.
 1. if exists() returns false, goto step 2. Otherwise, wait for a notification 
for the pathname from the previous step before going to step 2.

Note that while it appears that we have a "herd effect" when there are a bunch 
of clients waiting for a read lock and get notified when the "write-" node with 
the lower sequence number gets deleted. In fact that is valid behavior, all 
those read clients should be released since they have the lock. The "herd 
effect" refers to releasing a "herd" when in fact only a single or a small 
number of machines can proceed.

=== Revocable Shared Locks ===

We can make shared locks revocable by modifying slightly the shared lock 
protocol. In step 1. of both obtain lock protocols. After the create() the 
client does a getData() with watch set. If a client gets a notification for the 
node created in step 1, it does a getData() with watch set and looks for the 
string "unlock", which signals the client that it must release the lock.

Locks are revoked by doing a setData() on the node of the lock to revoke with 
the string "unlock" as the data.

Note that this protocol requires the lock holder to consent to releasing the 
lock. Such consent is important if the lock holder needs to do some processing 
before releasing the lock. Revocable Shared Locks with Freaking Laser Beams can 
be implemented by simply deleting the node of the locks to be revoked if they 
done get removed after a revoker-determined period of time.

== Two-Phase commit ==

We can implement two-phase commit by having a coordinator creating a 
transaction node, say "/app/Tx", and one child node per participating site, say 
"/app/Tx/s_i". The content of each child node is initially undefined. Once each 
site involved in the transaction receives the transaction from the coordinator, 
the site reads each child node and sets a watch. Each site then processes the 
query and votes commit or abort by writing to its respective node. Once the 
write completes, the other sites are notified, and as soon as all sites have 
all votes, they can decide either "abort" or "commit". Note that a node can 
decide "abort" earlier if some site votes for "abort".

An interesting aspect of this implementation is that the only role of the 
coordinator is to decide upon the group of sites, to create the ZooKeeper 
nodes, and to propagate the transaction to the corresponding sites. In fact, 
even propagating the transaction can be done through ZooKeeper by writing it in 
the transaction node.

There are two important drawbacks of the approach described above. One is the 
message complexity, which is O(n²). The second is the impossibility of 
detecting failures of sites through ephemeral nodes. To detect the failure of a 
site using ephemeral nodes, it is necessary that the site creates the node.

To solve the first problem, we can have only the coordinator being notified of 
changes to the transaction nodes, and then notifying the sites once it reaches 
a decision. Note that this approach, although more scalable, is slower as it 
requires all communication to go through the coordinator. For the second 
problem, we can have the coordinator propagating the transaction to the sites, 
and having each site creating its own ephemeral node.

Reply via email to