Author: jdcryans
Date: Mon Oct 11 18:00:48 2010
New Revision: 1021447

URL: http://svn.apache.org/viewvc?rev=1021447&view=rev
Log:
HBASE-3060  [replication] Reenable replication on trunk with unit tests

Added:
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
      - copied, changed from r1003174, 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Removed:
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSourceManager.java
Modified:
    hbase/trunk/CHANGES.txt
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java

Modified: hbase/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Oct 11 18:00:48 2010
@@ -580,7 +580,7 @@ Release 0.21.0 - Unreleased
    HBASE-3080  TestAdmin hanging on hudson
    HBASE-3063  TestThriftServer failing in TRUNK
    HBASE-3094  Fixes for miscellaneous broken tests
-  
+   HBASE-3060  [replication] Reenable replication on trunk with unit tests
 
 
   IMPROVEMENTS

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
 Mon Oct 11 18:00:48 2010
@@ -39,11 +39,11 @@ import org.apache.hadoop.hbase.HServerAd
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
 
 /**
  * This class serves as a helper for all things related to zookeeper
@@ -81,31 +81,39 @@ public class ReplicationZookeeper {
   // Our handle on zookeeper
   private final ZooKeeperWatcher zookeeper;
   // Map of addresses of peer clusters with their ZKW
-  private final Map<String, ReplicationZookeeper> peerClusters;
+  private Map<String, ZooKeeperWatcher> peerClusters;
   // Path to the root replication znode
-  private final String replicationZNode;
+  private String replicationZNode;
   // Path to the peer clusters znode
-  private final String peersZNode;
+  private String peersZNode;
   // Path to the znode that contains all RS that replicates
-  private final String rsZNode;
+  private String rsZNode;
   // Path to this region server's name under rsZNode
-  private final String rsServerNameZnode;
+  private String rsServerNameZnode;
   // Name node if the replicationState znode
-  private final String replicationStateNodeName;
+  private String replicationStateNodeName;
   // If this RS is part of a master cluster
-  private final boolean replicationMaster;
+  private boolean replicationMaster;
   private final Configuration conf;
   // Is this cluster replicating at the moment?
-  private final AtomicBoolean replicating;
+  private AtomicBoolean replicating;
   // Byte (stored as string here) that identifies this cluster
-  private final String clusterId;
+  private String clusterId;
   // Abortable
-  private final Abortable abortable;
+  private Abortable abortable;
+
+  public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher 
zk)
+    throws KeeperException {
+
+    this.conf = conf;
+    this.zookeeper = zk;
+    setZNodes();
+  }
 
   /**
    * Constructor used by region servers, connects to the peer cluster right 
away.
    *
-   * @param zookeeper
+   * @param server
    * @param replicating    atomic boolean to start/stop replication
    * @throws IOException
    * @throws KeeperException 
@@ -115,6 +123,27 @@ public class ReplicationZookeeper {
     this.abortable = server;
     this.zookeeper = server.getZooKeeper();
     this.conf = server.getConfiguration();
+    setZNodes();
+
+    this.peerClusters = new HashMap<String, ZooKeeperWatcher>();
+    this.replicating = replicating;
+    setReplicating();
+    this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
+    ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
+    // Set a tracker on replicationStateNodeNode
+    ReplicationStatusTracker tracker =
+        new ReplicationStatusTracker(this.zookeeper, server);
+    tracker.start();
+
+    List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, 
this.peersZNode);
+    if (znodes != null) {
+      for (String z : znodes) {
+        connectToPeer(z);
+      }
+    }
+  }
+
+  private void setZNodes() throws KeeperException {
     String replicationZNodeName =
         conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName =
@@ -130,15 +159,11 @@ public class ReplicationZookeeper {
     String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
           this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
           this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
-
-    this.peerClusters = new HashMap<String, ReplicationZookeeper>();
     this.replicationZNode =
       ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
     this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
 
-    this.replicating = replicating;
-    setReplicating();
     String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
     byte [] data = ZKUtil.getData(this.zookeeper, znode);
     String idResult = Bytes.toString(data);
@@ -152,24 +177,6 @@ public class ReplicationZookeeper {
     LOG.info("This cluster (" + thisCluster + ") is a " +
       (this.replicationMaster ? "master" : "slave") + " for replication" +
         ", compared with (" + address + ")");
-
-    if (server.getServerName() != null) {
-      this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, 
server.getServerName());
-      // Set a tracker on replicationStateNodeNode
-      ReplicationStatusTracker tracker =
-        new ReplicationStatusTracker(this.zookeeper, getRepStateNode(), 
server);
-      tracker.start();
-
-      List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, 
this.peersZNode);
-      if (znodes != null) {
-        for (String z : znodes) {
-          connectToPeer(z);
-        }
-      }
-    } else {
-      this.rsServerNameZnode = null;
-    }
-
   }
 
   /**
@@ -178,47 +185,16 @@ public class ReplicationZookeeper {
    * @param peerClusterId (byte) the cluster to interrogate
    * @return addresses of all region servers
    */
-  public List<HServerAddress> getPeersAddresses(String peerClusterId) {
+  public List<HServerAddress> getPeersAddresses(String peerClusterId)
+      throws KeeperException {
     if (this.peerClusters.size() == 0) {
       return new ArrayList<HServerAddress>(0);
     }
-    ReplicationZookeeper zkw = this.peerClusters.get(peerClusterId);
+    ZooKeeperWatcher zkw = this.peerClusters.get(peerClusterId);
+    
     return zkw == null?
       new ArrayList<HServerAddress>(0):
-      zkw.scanAddressDirectory(this.zookeeper.rsZNode);
-  }
-
-  /**
-   * Scan a directory of address data.
-   * @param znode The parent node
-   * @return The directory contents as HServerAddresses
-   */
-  public List<HServerAddress> scanAddressDirectory(String znode) {
-    List<HServerAddress> list = new ArrayList<HServerAddress>();
-    List<String> nodes = null;
-    try {
-      nodes = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Scanning " + znode, e);
-    }
-    if (nodes == null) {
-      return list;
-    }
-    for (String node : nodes) {
-      String path = ZKUtil.joinZNode(znode, node);
-      list.add(readAddress(path));
-    }
-    return list;
-  }
-
-  private HServerAddress readAddress(String znode) {
-    byte [] data = null;
-    try {
-      data = ZKUtil.getData(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Getting address", e);
-    }
-    return new HServerAddress(Bytes.toString(data));
+      ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
   }
 
   /**
@@ -239,15 +215,11 @@ public class ReplicationZookeeper {
     otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
     otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
     otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
-    // REENABLE -- FIX!!!!
-    /*
-    ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
-        "connection to cluster: " + peerId);
-    zkw.registerListener(new ReplicationStatusWatcher());
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
+        "connection to cluster: " + peerId, this.abortable);
     this.peerClusters.put(peerId, zkw);
-    this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
+    ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
         this.rsServerNameZnode, peerId));
-        */
     LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
   }
 
@@ -282,7 +254,7 @@ public class ReplicationZookeeper {
     try {
       String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
       znode = ZKUtil.joinZNode(znode, filename);
-      ZKUtil.createAndWatch(this.zookeeper, znode, Bytes.toBytes(""));
+      ZKUtil.createWithParents(this.zookeeper, znode);
     } catch (KeeperException e) {
       this.abortable.abort("Failed add log to list", e);
     }
@@ -297,7 +269,7 @@ public class ReplicationZookeeper {
     try {
       String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
       znode = ZKUtil.joinZNode(znode, filename);
-      ZKUtil.deleteChildrenRecursively(this.zookeeper, znode);
+      ZKUtil.deleteNode(this.zookeeper, znode);
     } catch (KeeperException e) {
       this.abortable.abort("Failed remove from list", e);
     }
@@ -316,7 +288,7 @@ public class ReplicationZookeeper {
       String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
       znode = ZKUtil.joinZNode(znode, filename);
       // Why serialize String of Long and note Long as bytes?
-      ZKUtil.createAndWatch(this.zookeeper, znode,
+      ZKUtil.setData(this.zookeeper, znode,
         Bytes.toBytes(Long.toString(position)));
     } catch (KeeperException e) {
       this.abortable.abort("Writing replication status", e);
@@ -326,15 +298,18 @@ public class ReplicationZookeeper {
   /**
    * Get a list of all the other region servers in this cluster
    * and set a watch
-   * @param watch the watch to set
    * @return a list of server nanes
    */
-  public List<String> getRegisteredRegionServers(Watcher watch) {
+  public List<String> getRegisteredRegionServers() {
     List<String> result = null;
     try {
-      // TODO: This is rsZNode from zk which is like getListOfReplicators
-      // but maybe these are from different zk instances?
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
+      List<ZKUtil.NodeAndData> nads =
+          ZKUtil.watchAndGetNewChildren(this.zookeeper, 
this.zookeeper.rsZNode);
+      result = new ArrayList<String>(nads.size());
+      for (ZKUtil.NodeAndData nad : nads) {
+        String[] fullPath = nad.getNode().split("/");
+        result.add(fullPath[fullPath.length - 1]);
+      }
     } catch (KeeperException e) {
       this.abortable.abort("Get list of registered region servers", e);
     }
@@ -344,7 +319,6 @@ public class ReplicationZookeeper {
   /**
    * Get the list of the replicators that have queues, they can be alive, dead
    * or simply from a previous run
-   * @param watch the watche to set
    * @return a list of server names
    */
   public List<String> getListOfReplicators() {
@@ -360,7 +334,6 @@ public class ReplicationZookeeper {
   /**
    * Get the list of peer clusters for the specified server names
    * @param rs server names of the rs
-   * @param watch the watch to set
    * @return a list of peer cluster
    */
   public List<String> getListPeersForRS(String rs) {
@@ -378,7 +351,6 @@ public class ReplicationZookeeper {
    * Get the list of hlogs for the specified region server and peer cluster
    * @param rs server names of the rs
    * @param id peer cluster
-   * @param watch the watch to set
    * @return a list of hlogs
    */
   public List<String> getListHLogsForPeerForRS(String rs, String id) {
@@ -401,10 +373,14 @@ public class ReplicationZookeeper {
   public boolean lockOtherRS(String znode) {
     try {
       String parent = ZKUtil.joinZNode(this.rsZNode, znode);
+      if (parent.equals(rsServerNameZnode)) {
+        LOG.warn("Won't lock because this is us, we're dead!");
+        return false;
+      }
       String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
       ZKUtil.createAndWatch(this.zookeeper, p, 
Bytes.toBytes(rsServerNameZnode));
     } catch (KeeperException e) {
-      this.abortable.abort("Failed lock other rs", e);
+      LOG.info("Failed lock other rs", e);
     }
     return true;
   }
@@ -468,7 +444,7 @@ public class ReplicationZookeeper {
    */
   public void deleteSource(String peerZnode) {
     try {
-      ZKUtil.deleteChildrenRecursively(this.zookeeper,
+      ZKUtil.deleteNodeRecursively(this.zookeeper,
           ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
     } catch (KeeperException e) {
       this.abortable.abort("Failed delete of " + peerZnode, e);
@@ -481,7 +457,7 @@ public class ReplicationZookeeper {
    */
   public void deleteRsQueues(String znode) {
     try {
-      ZKUtil.deleteChildrenRecursively(this.zookeeper,
+      ZKUtil.deleteNodeRecursively(this.zookeeper,
           ZKUtil.joinZNode(rsZNode, znode));
     } catch (KeeperException e) {
       this.abortable.abort("Failed delete of " + znode, e);
@@ -492,7 +468,12 @@ public class ReplicationZookeeper {
    * Delete this cluster's queues
    */
   public void deleteOwnRSZNode() {
-    deleteRsQueues(this.rsServerNameZnode);
+    try {
+      ZKUtil.deleteNodeRecursively(this.zookeeper,
+          this.rsServerNameZnode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e);
+    }
   }
 
   /**
@@ -510,13 +491,8 @@ public class ReplicationZookeeper {
     return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
   }
 
-  /**
-   * Tells if this cluster replicates or not
-   *
-   * @return if this is a master
-   */
-  public boolean isReplicationMaster() {
-    return this.replicationMaster;
+  public void registerRegionServerListener(ZooKeeperListener listener) {
+    this.zookeeper.registerListener(listener);
   }
 
   /**
@@ -532,17 +508,29 @@ public class ReplicationZookeeper {
    * Get a map of all peer clusters
    * @return map of peer cluster, zk address to ZKW
    */
-  public Map<String, ReplicationZookeeper> getPeerClusters() {
+  public Map<String, ZooKeeperWatcher> getPeerClusters() {
     return this.peerClusters;
   }
 
+  public String getRSZNode() {
+    return rsZNode;
+  }
+
+  /**
+   * 
+   * @return
+   */
+  public ZooKeeperWatcher getZookeeperWatcher() {
+    return this.zookeeper;
+  }
+
   /**
    * Tracker for status of the replication
    */
   public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
-    public ReplicationStatusTracker(ZooKeeperWatcher watcher, String node,
+    public ReplicationStatusTracker(ZooKeeperWatcher watcher,
         Abortable abortable) {
-      super(watcher, node, abortable);
+      super(watcher, getRepStateNode(), abortable);
     }
 
     @Override

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
 Mon Oct 11 18:00:48 2010
@@ -23,24 +23,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LogCleanerDelegate;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
-// REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
 
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Implementation of a log cleaner that checks if a log is still scheduled for
  * replication before deleting it when its TTL is over.
  */
-public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
+public class ReplicationLogCleaner implements LogCleanerDelegate {
 
   private static final Log LOG =
     LogFactory.getLog(ReplicationLogCleaner.class);
@@ -78,31 +75,30 @@ public class ReplicationLogCleaner imple
   private boolean refreshHLogsAndSearch(String searchedLog) {
     this.hlogs.clear();
     final boolean lookForLog = searchedLog != null;
-// REENALBE
-//    List<String> rss = zkHelper.getListOfReplicators(this);
-//    if (rss == null) {
-//      LOG.debug("Didn't find any region server that replicates, deleting: " +
-//          searchedLog);
-//      return false;
-//    }
-//    for (String rs: rss) {
-//      List<String> listOfPeers = zkHelper.getListPeersForRS(rs, this);
-//      // if rs just died, this will be null
-//      if (listOfPeers == null) {
-//        continue;
-//      }
-//      for (String id : listOfPeers) {
-//        List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, 
this);
-//        if (peersHlogs != null) {
-//          this.hlogs.addAll(peersHlogs);
-//        }
-//        // early exit if we found the log
-//        if(lookForLog && this.hlogs.contains(searchedLog)) {
-//          LOG.debug("Found log in ZK, keeping: " + searchedLog);
-//          return true;
-//        }
-//      }
-//    }
+    List<String> rss = zkHelper.getListOfReplicators();
+    if (rss == null) {
+      LOG.debug("Didn't find any region server that replicates, deleting: " +
+          searchedLog);
+      return false;
+    }
+    for (String rs: rss) {
+      List<String> listOfPeers = zkHelper.getListPeersForRS(rs);
+      // if rs just died, this will be null
+      if (listOfPeers == null) {
+        continue;
+      }
+      for (String id : listOfPeers) {
+        List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id);
+        if (peersHlogs != null) {
+          this.hlogs.addAll(peersHlogs);
+        }
+        // early exit if we found the log
+        if(lookForLog && this.hlogs.contains(searchedLog)) {
+          LOG.debug("Found log in ZK, keeping: " + searchedLog);
+          return true;
+        }
+      }
+    }
     LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
     return false;
   }
@@ -110,15 +106,15 @@ public class ReplicationLogCleaner imple
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
-//    try {
-      // REENABLE
-//      this.zkHelper = new ReplicationZookeeperWrapper(
-//          ZooKeeperWrapper.createInstance(this.conf,
-//              HMaster.class.getName()),
-//          this.conf, new AtomicBoolean(true), null);
-//    } catch (IOException e) {
-//      LOG.error(e);
-//    }
+    try {
+      ZooKeeperWatcher zkw =
+          new ZooKeeperWatcher(conf, this.getClass().getName(), null);
+      this.zkHelper = new ReplicationZookeeper(conf, zkw);
+    } catch (KeeperException e) {
+      LOG.error("Error while configuring " + this.getClass().getName(), e);
+    } catch (IOException e) {
+      LOG.error("Error while configuring " + this.getClass().getName(), e);
+    }
     refreshHLogsAndSearch(null);
   }
 
@@ -126,7 +122,4 @@ public class ReplicationLogCleaner imple
   public Configuration getConf() {
     return conf;
   }
-
-  @Override
-  public void process(WatchedEvent watchedEvent) {}
 }

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 Mon Oct 11 18:00:48 2010
@@ -45,7 +45,6 @@ import org.apache.zookeeper.KeeperExcept
 public class Replication implements WALObserver {
   private final boolean replication;
   private final ReplicationSourceManager replicationManager;
-  private boolean replicationMaster;
   private final AtomicBoolean replicating = new AtomicBoolean(true);
   private final ReplicationZookeeper zkHelper;
   private final Configuration conf;
@@ -70,10 +69,8 @@ public class Replication implements WALO
     this.replication = isReplication(this.conf);
     if (replication) {
       this.zkHelper = new ReplicationZookeeper(server, this.replicating);
-      this.replicationMaster = zkHelper.isReplicationMaster();
-      this.replicationManager = this.replicationMaster ?
-        new ReplicationSourceManager(zkHelper, conf, this.server,
-          fs, this.replicating, logDir, oldLogDir) : null;
+      this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
+          this.server, fs, this.replicating, logDir, oldLogDir) ;
     } else {
       this.replicationManager = null;
       this.zkHelper = null;
@@ -93,10 +90,8 @@ public class Replication implements WALO
    */
   public void join() {
     if (this.replication) {
-      if (this.replicationMaster) {
-        this.replicationManager.join();
-      }
-        this.zkHelper.deleteOwnRSZNode();
+      this.replicationManager.join();
+      this.zkHelper.deleteOwnRSZNode();
     }
   }
 
@@ -106,7 +101,7 @@ public class Replication implements WALO
    * @throws IOException
    */
   public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
-    if (this.replication && !this.replicationMaster) {
+    if (this.replication) {
       this.replicationSink.replicateEntries(entries);
     }
   }
@@ -118,11 +113,8 @@ public class Replication implements WALO
    */
   public void startReplicationServices() throws IOException {
     if (this.replication) {
-      if (this.replicationMaster) {
-        this.replicationManager.init();
-      } else {
-        this.replicationSink = new ReplicationSink(this.conf, this.server);
-      }
+      this.replicationManager.init();
+      this.replicationSink = new ReplicationSink(this.conf, this.server);
     }
   }
 

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
 Mon Oct 11 18:00:48 2010
@@ -38,7 +38,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This class is responsible for replicating the edits coming

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 Mon Oct 11 18:00:48 2010
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Class that handles the source of a replication stream.
@@ -195,7 +196,7 @@ public class ReplicationSource extends T
   /**
    * Select a number of peers at random using the ratio. Mininum 1.
    */
-  private void chooseSinks() {
+  private void chooseSinks() throws KeeperException {
     this.currentPeers.clear();
     List<HServerAddress> addresses =
         this.zkHelper.getPeersAddresses(peerClusterId);
@@ -231,8 +232,14 @@ public class ReplicationSource extends T
     // If this is recovered, the queue is already full and the first log
     // normally has a position (unless the RS failed between 2 logs)
     if (this.queueRecovered) {
-//      this.position = this.zkHelper.getHLogRepPosition(
-//          this.peerClusterZnode, this.queue.peek().getName());
+      try {
+        this.position = this.zkHelper.getHLogRepPosition(
+            this.peerClusterZnode, this.queue.peek().getName());
+      } catch (KeeperException e) {
+        LOG.error("Couldn't get the position of this recovered queue " +
+            peerClusterZnode, e);
+        this.abort();
+      }
     }
     int sleepMultiplier = 1;
     // Loop until we close down
@@ -380,6 +387,8 @@ public class ReplicationSource extends T
         Thread.sleep(this.sleepForRetries);
       } catch (InterruptedException e) {
         LOG.error("Interrupted while trying to connect to sinks", e);
+      } catch (KeeperException e) {
+        LOG.error("Error talking to zookeeper, retrying", e);
       }
     }
   }
@@ -553,6 +562,8 @@ public class ReplicationSource extends T
           } while (!this.stopper.isStopped() && down);
         } catch (InterruptedException e) {
           LOG.debug("Interrupted while trying to contact the peer cluster");
+        } catch (KeeperException e) {
+          LOG.error("Error talking to zookeeper, retrying", e);
         }
 
       }
@@ -589,7 +600,7 @@ public class ReplicationSource extends T
           }
         };
     Threads.setDaemonThreadRunning(
-        this, n + ".replicationSource," + clusterId, handler);
+        this, n + ".replicationSource," + peerClusterZnode, handler);
   }
 
   /**

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 Mon Oct 11 18:00:48 2010
@@ -36,8 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 /**
  * This class is responsible to manage all the replication
@@ -104,8 +104,10 @@ public class ReplicationSourceManager {
     this.fs = fs;
     this.logDir = logDir;
     this.oldLogDir = oldLogDir;
+    this.zkHelper.registerRegionServerListener(
+        new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
     List<String> otherRSs =
-        this.zkHelper.getRegisteredRegionServers(new 
OtherRegionServerWatcher());
+        this.zkHelper.getRegisteredRegionServers();
     this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : 
otherRSs;
   }
 
@@ -145,7 +147,10 @@ public class ReplicationSourceManager {
       ReplicationSourceInterface src = addSource(id);
       src.startup();
     }
-    List<String> currentReplicators = this.zkHelper.getListOfReplicators();
+    List<String> currentReplicators = 
this.zkHelper.getRegisteredRegionServers();
+    if (currentReplicators == null || currentReplicators.size() == 0) {
+      return;
+    }
     synchronized (otherRegionServers) {
       LOG.info("Current list of replicators: " + currentReplicators
           + " other RSs: " + otherRegionServers);
@@ -322,29 +327,59 @@ public class ReplicationSourceManager {
    * in the local cluster. It initiates the process to transfer the queues
    * if it is able to grab the lock.
    */
-  public class OtherRegionServerWatcher implements Watcher {
-    @Override
-    public void process(WatchedEvent watchedEvent) {
-      LOG.info(" event " + watchedEvent);
-      if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
-          watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
+  public class OtherRegionServerWatcher extends ZooKeeperListener {
+
+    /**
+     * Construct a ZooKeeper event listener.
+     */
+    public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
+      super(watcher);
+    }
+
+    /**
+     * Called when a new node has been created.
+     * @param path full path of the new node
+     */
+    public void nodeCreated(String path) {
+      refreshRegionServersList(path);
+    }
+
+    /**
+     * Called when a node has been deleted
+     * @param path full path of the deleted node
+     */
+    public void nodeDeleted(String path) {
+      boolean cont = refreshRegionServersList(path);
+      if (!cont) {
         return;
       }
-
-      List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
+      LOG.info(path + " znode expired, trying to lock it");
+      String[] rsZnodeParts = path.split("/");
+      transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+    }
+
+    /**
+     * Called when an existing node has a child node added or removed.
+     * @param path full path of the node whose children have changed
+     */
+    public void nodeChildrenChanged(String path) {
+      refreshRegionServersList(path);
+    }
+
+    private boolean refreshRegionServersList(String path) {
+      if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
+        return false;
+      }
+      List<String> newRsList = (zkHelper.getRegisteredRegionServers());
       if (newRsList == null) {
-        return;
+        return false;
       } else {
         synchronized (otherRegionServers) {
           otherRegionServers.clear();
           otherRegionServers.addAll(newRsList);
         }
       }
-      if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
-        LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
-        String[] rsZnodeParts = watchedEvent.getPath().split("/");
-        transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
-      }
+      return true;
     }
   }
 

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Mon 
Oct 11 18:00:48 2010
@@ -916,7 +916,7 @@ public class ZKUtil {
   public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String 
node)
   throws KeeperException {
     List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
-    if(!children.isEmpty()) {
+    if(children != null || !children.isEmpty()) {
       for(String child : children) {
         deleteNodeRecursively(zkw, joinZNode(node, child));
       }

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
 (original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
 Mon Oct 11 18:00:48 2010
@@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -54,10 +56,9 @@ public class TestReplication {
 
   private static Configuration conf1;
   private static Configuration conf2;
-/*
-  private static ZooKeeperWrapper zkw1;
-  private static ZooKeeperWrapper zkw2;
-  */
+
+  private static ZooKeeperWatcher zkw1;
+  private static ZooKeeperWatcher zkw2;
 
   private static HTable htable1;
   private static HTable htable2;
@@ -92,15 +93,15 @@ public class TestReplication {
     conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
     conf1.setBoolean("dfs.support.append", true);
     conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-    /* REENALBE
     utility1 = new HBaseTestingUtility(conf1);
     utility1.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
-    zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1");
-    zkw1.writeZNode("/1", "replication", "");
-    zkw1.writeZNode("/1/replication", "master",
+    zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null);
+    ZKUtil.createWithParents(zkw1, "/1/replication/master");
+    ZKUtil.createWithParents(zkw1, "/1/replication/state");
+    ZKUtil.setData(zkw1, "/1/replication/master", Bytes.toBytes(
         conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+            conf1.get("hbase.zookeeper.property.clientPort")+":/1"));
     setIsReplication(true);
     LOG.info("Setup first Zk");
 
@@ -113,15 +114,13 @@ public class TestReplication {
 
     utility2 = new HBaseTestingUtility(conf2);
     utility2.setZkCluster(miniZK);
-    zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2");
-    zkw2.writeZNode("/2", "replication", "");
-    zkw2.writeZNode("/2/replication", "master",
-        conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+    zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null);
+    ZKUtil.createWithParents(zkw2, "/2/replication");
 
-    zkw1.writeZNode("/1/replication/peers", "1",
+    ZKUtil.createWithParents(zkw1, "/1/replication/peers/2");
+    ZKUtil.setData(zkw1, "/1/replication/peers/2", Bytes.toBytes(
         conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+            conf2.get("hbase.zookeeper.property.clientPort")+":/2"));
 
     LOG.info("Setup second Zk");
 
@@ -143,12 +142,12 @@ public class TestReplication {
     htable1 = new HTable(conf1, tableName);
     htable1.setWriteBufferSize(1024);
     htable2 = new HTable(conf2, tableName);
-    */
   }
 
   private static void setIsReplication(boolean rep) throws Exception {
     LOG.info("Set rep " + rep);
-   // REENALBE  zkw1.writeZNode("/1/replication", "state", 
Boolean.toString(rep));
+    ZKUtil.setData(zkw1,"/1/replication/state",
+        Bytes.toBytes(Boolean.toString(rep)));
     // Takes some ms for ZK to fire the watcher
     Thread.sleep(SLEEP_TIME);
   }
@@ -181,7 +180,7 @@ public class TestReplication {
    * Add a row, check it's replicated, delete it, check's gone
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void testSimplePutDelete() throws Exception {
     LOG.info("testSimplePutDelete");
     Put put = new Put(row);
@@ -229,7 +228,7 @@ public class TestReplication {
    * Try a small batch upload using the write buffer, check it's replicated
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void testSmallBatch() throws Exception {
     LOG.info("testSmallBatch");
     Put put;
@@ -273,7 +272,7 @@ public class TestReplication {
    * replicated, enable it, try replicating and it should work
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void testStartStop() throws Exception {
 
     // Test stopping replication
@@ -342,7 +341,7 @@ public class TestReplication {
    * hlog rolling and other non-trivial code paths
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void loadTesting() throws Exception {
     htable1.setWriteBufferSize(1024);
     htable1.setAutoFlush(false);
@@ -396,7 +395,7 @@ public class TestReplication {
    * the upload. The failover happens internally.
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void queueFailover() throws Exception {
     utility1.createMultiRegions(htable1, famName);
 

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
 (original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
 Mon Oct 11 18:00:48 2010
@@ -69,7 +69,7 @@ public class TestReplicationSource {
    * time reading logs that are being archived.
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void testLogMoving() throws Exception{
     Path logPath = new Path(logDir, "log");
     HLog.Writer writer = HLog.createWriter(fs, logPath, conf);

Copied: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
 (from r1003174, 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java)
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java?p2=hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java&p1=hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java&r1=1003174&r2=1021447&rev=1021447&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java
 (original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
 Mon Oct 11 18:00:48 2010
@@ -47,10 +47,10 @@ import java.util.concurrent.atomic.Atomi
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class DISABLEDTestReplicationSink {
+public class TestReplicationSink {
 
   private static final Log LOG =
-      LogFactory.getLog(DISABLEDTestReplicationSink.class);
+      LogFactory.getLog(TestReplicationSink.class);
 
   private static final int BATCH_SIZE = 10;
 
@@ -128,7 +128,7 @@ public class DISABLEDTestReplicationSink
    * Insert a whole batch of entries
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void testBatchSink() throws Exception {
     HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
     for(int i = 0; i < BATCH_SIZE; i++) {
@@ -144,7 +144,7 @@ public class DISABLEDTestReplicationSink
    * Insert a mix of puts and deletes
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void testMixedPutDelete() throws Exception {
     HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
     for(int i = 0; i < BATCH_SIZE/2; i++) {
@@ -168,7 +168,7 @@ public class DISABLEDTestReplicationSink
    * Insert to 2 different tables
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void testMixedPutTables() throws Exception {
     HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
     for(int i = 0; i < BATCH_SIZE; i++) {
@@ -189,7 +189,7 @@ public class DISABLEDTestReplicationSink
    * Insert then do different types of deletes
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void testMixedDeletes() throws Exception {
     HLog.Entry[] entries = new HLog.Entry[3];
     for(int i = 0; i < 3; i++) {
@@ -214,7 +214,7 @@ public class DISABLEDTestReplicationSink
    * before the actual Put that creates it.
    * @throws Exception
    */
-  @Ignore @Test
+  @Test
   public void testApplyDeleteBeforePut() throws Exception {
     HLog.Entry[] entries = new HLog.Entry[5];
     for(int i = 0; i < 2; i++) {

Added: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1021447&view=auto
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 (added)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 Mon Oct 11 18:00:48 2010
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestReplicationSourceManager {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final byte[] test = Bytes.toBytes("test");
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+        ReplicationSourceDummy.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/master");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/master",
+        Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+        conf.get("hbase.zookeeper.property.clientPort")+":/1"));
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",Bytes.toBytes(
+          conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+          conf.get("hbase.zookeeper.property.clientPort")+":/1"));
+
+    replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getTestDir(),
+        HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getTestDir(),
+        HConstants.HREGION_LOGDIR_NAME);
+
+    manager.addSource("1");
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor("f1");
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor("f2");
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd, r1, r2);
+
+
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    manager.join();
+    utility.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    setUp();
+  }
+
+  @Test
+  public void testLogRoll() throws Exception {
+    long seq = 0;
+    long baseline = 1000;
+    long time = baseline;
+    KeyValue kv = new KeyValue(r1, f1, r1);
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    List<WALObserver> listeners = new ArrayList<WALObserver>();
+    listeners.add(replication);
+    HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners,
+      URLEncoder.encode("regionserver:60020", "UTF8"));
+
+    manager.init();
+
+    // Testing normal log rolling every 20
+    for(long i = 1; i < 101; i++) {
+      if(i > 1 && i % 20 == 0) {
+        hlog.rollWriter();
+      }
+      LOG.info(i);
+      HLogKey key = new HLogKey(hri.getRegionName(),
+        test, seq++, System.currentTimeMillis());
+      hlog.append(hri, key, edit);
+    }
+
+    // Simulate a rapid insert that's followed
+    // by a report that's still not totally complete (missing last one)
+    LOG.info(baseline + " and " + time);
+    baseline += 101;
+    time = baseline;
+    LOG.info(baseline + " and " + time);
+
+    for (int i = 0; i < 3; i++) {
+      HLogKey key = new HLogKey(hri.getRegionName(),
+        test, seq++, System.currentTimeMillis());
+      hlog.append(hri, key, edit);
+    }
+
+    assertEquals(6, manager.getHLogs().size());
+
+    hlog.rollWriter();
+
+    
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
+        "1", 0, false);
+
+    HLogKey key = new HLogKey(hri.getRegionName(),
+          test, seq++, System.currentTimeMillis());
+    hlog.append(hri, key, edit);
+
+    assertEquals(1, manager.getHLogs().size());
+
+
+    // TODO Need a case with only 2 HLogs and we only want to delete the first 
one
+  }
+
+  static class DummyServer implements Server {
+
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      return zkw;
+    }
+
+    @Override
+    public CatalogTracker getCatalogTracker() {
+      return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    @Override
+    public String getServerName() {
+      return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+      //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
+    @Override
+    public void stop(String why) {
+      //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+  }
+
+}


Reply via email to