Author: jdcryans
Date: Mon Oct 11 18:00:48 2010
New Revision: 1021447
URL: http://svn.apache.org/viewvc?rev=1021447&view=rev
Log:
HBASE-3060 [replication] Reenable replication on trunk with unit tests
Added:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
- copied, changed from r1003174,
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Removed:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSourceManager.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Oct 11 18:00:48 2010
@@ -580,7 +580,7 @@ Release 0.21.0 - Unreleased
HBASE-3080 TestAdmin hanging on hudson
HBASE-3063 TestThriftServer failing in TRUNK
HBASE-3094 Fixes for miscellaneous broken tests
-
+ HBASE-3060 [replication] Reenable replication on trunk with unit tests
IMPROVEMENTS
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
Mon Oct 11 18:00:48 2010
@@ -39,11 +39,11 @@ import org.apache.hadoop.hbase.HServerAd
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
/**
* This class serves as a helper for all things related to zookeeper
@@ -81,31 +81,39 @@ public class ReplicationZookeeper {
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
// Map of addresses of peer clusters with their ZKW
- private final Map<String, ReplicationZookeeper> peerClusters;
+ private Map<String, ZooKeeperWatcher> peerClusters;
// Path to the root replication znode
- private final String replicationZNode;
+ private String replicationZNode;
// Path to the peer clusters znode
- private final String peersZNode;
+ private String peersZNode;
// Path to the znode that contains all RS that replicates
- private final String rsZNode;
+ private String rsZNode;
// Path to this region server's name under rsZNode
- private final String rsServerNameZnode;
+ private String rsServerNameZnode;
// Name node if the replicationState znode
- private final String replicationStateNodeName;
+ private String replicationStateNodeName;
// If this RS is part of a master cluster
- private final boolean replicationMaster;
+ private boolean replicationMaster;
private final Configuration conf;
// Is this cluster replicating at the moment?
- private final AtomicBoolean replicating;
+ private AtomicBoolean replicating;
// Byte (stored as string here) that identifies this cluster
- private final String clusterId;
+ private String clusterId;
// Abortable
- private final Abortable abortable;
+ private Abortable abortable;
+
+ public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher
zk)
+ throws KeeperException {
+
+ this.conf = conf;
+ this.zookeeper = zk;
+ setZNodes();
+ }
/**
* Constructor used by region servers, connects to the peer cluster right
away.
*
- * @param zookeeper
+ * @param server
* @param replicating atomic boolean to start/stop replication
* @throws IOException
* @throws KeeperException
@@ -115,6 +123,27 @@ public class ReplicationZookeeper {
this.abortable = server;
this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration();
+ setZNodes();
+
+ this.peerClusters = new HashMap<String, ZooKeeperWatcher>();
+ this.replicating = replicating;
+ setReplicating();
+ this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
+ ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
+ // Set a tracker on replicationStateNodeNode
+ ReplicationStatusTracker tracker =
+ new ReplicationStatusTracker(this.zookeeper, server);
+ tracker.start();
+
+ List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper,
this.peersZNode);
+ if (znodes != null) {
+ for (String z : znodes) {
+ connectToPeer(z);
+ }
+ }
+ }
+
+ private void setZNodes() throws KeeperException {
String replicationZNodeName =
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
@@ -130,15 +159,11 @@ public class ReplicationZookeeper {
String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
-
- this.peerClusters = new HashMap<String, ReplicationZookeeper>();
this.replicationZNode =
ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
- this.replicating = replicating;
- setReplicating();
String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
String idResult = Bytes.toString(data);
@@ -152,24 +177,6 @@ public class ReplicationZookeeper {
LOG.info("This cluster (" + thisCluster + ") is a " +
(this.replicationMaster ? "master" : "slave") + " for replication" +
", compared with (" + address + ")");
-
- if (server.getServerName() != null) {
- this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode,
server.getServerName());
- // Set a tracker on replicationStateNodeNode
- ReplicationStatusTracker tracker =
- new ReplicationStatusTracker(this.zookeeper, getRepStateNode(),
server);
- tracker.start();
-
- List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper,
this.peersZNode);
- if (znodes != null) {
- for (String z : znodes) {
- connectToPeer(z);
- }
- }
- } else {
- this.rsServerNameZnode = null;
- }
-
}
/**
@@ -178,47 +185,16 @@ public class ReplicationZookeeper {
* @param peerClusterId (byte) the cluster to interrogate
* @return addresses of all region servers
*/
- public List<HServerAddress> getPeersAddresses(String peerClusterId) {
+ public List<HServerAddress> getPeersAddresses(String peerClusterId)
+ throws KeeperException {
if (this.peerClusters.size() == 0) {
return new ArrayList<HServerAddress>(0);
}
- ReplicationZookeeper zkw = this.peerClusters.get(peerClusterId);
+ ZooKeeperWatcher zkw = this.peerClusters.get(peerClusterId);
+
return zkw == null?
new ArrayList<HServerAddress>(0):
- zkw.scanAddressDirectory(this.zookeeper.rsZNode);
- }
-
- /**
- * Scan a directory of address data.
- * @param znode The parent node
- * @return The directory contents as HServerAddresses
- */
- public List<HServerAddress> scanAddressDirectory(String znode) {
- List<HServerAddress> list = new ArrayList<HServerAddress>();
- List<String> nodes = null;
- try {
- nodes = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
- } catch (KeeperException e) {
- this.abortable.abort("Scanning " + znode, e);
- }
- if (nodes == null) {
- return list;
- }
- for (String node : nodes) {
- String path = ZKUtil.joinZNode(znode, node);
- list.add(readAddress(path));
- }
- return list;
- }
-
- private HServerAddress readAddress(String znode) {
- byte [] data = null;
- try {
- data = ZKUtil.getData(this.zookeeper, znode);
- } catch (KeeperException e) {
- this.abortable.abort("Getting address", e);
- }
- return new HServerAddress(Bytes.toString(data));
+ ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
}
/**
@@ -239,15 +215,11 @@ public class ReplicationZookeeper {
otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
- // REENABLE -- FIX!!!!
- /*
- ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
- "connection to cluster: " + peerId);
- zkw.registerListener(new ReplicationStatusWatcher());
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
+ "connection to cluster: " + peerId, this.abortable);
this.peerClusters.put(peerId, zkw);
- this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
+ ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
this.rsServerNameZnode, peerId));
- */
LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
}
@@ -282,7 +254,7 @@ public class ReplicationZookeeper {
try {
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
znode = ZKUtil.joinZNode(znode, filename);
- ZKUtil.createAndWatch(this.zookeeper, znode, Bytes.toBytes(""));
+ ZKUtil.createWithParents(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed add log to list", e);
}
@@ -297,7 +269,7 @@ public class ReplicationZookeeper {
try {
String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
znode = ZKUtil.joinZNode(znode, filename);
- ZKUtil.deleteChildrenRecursively(this.zookeeper, znode);
+ ZKUtil.deleteNode(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed remove from list", e);
}
@@ -316,7 +288,7 @@ public class ReplicationZookeeper {
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
znode = ZKUtil.joinZNode(znode, filename);
// Why serialize String of Long and note Long as bytes?
- ZKUtil.createAndWatch(this.zookeeper, znode,
+ ZKUtil.setData(this.zookeeper, znode,
Bytes.toBytes(Long.toString(position)));
} catch (KeeperException e) {
this.abortable.abort("Writing replication status", e);
@@ -326,15 +298,18 @@ public class ReplicationZookeeper {
/**
* Get a list of all the other region servers in this cluster
* and set a watch
- * @param watch the watch to set
* @return a list of server nanes
*/
- public List<String> getRegisteredRegionServers(Watcher watch) {
+ public List<String> getRegisteredRegionServers() {
List<String> result = null;
try {
- // TODO: This is rsZNode from zk which is like getListOfReplicators
- // but maybe these are from different zk instances?
- result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
+ List<ZKUtil.NodeAndData> nads =
+ ZKUtil.watchAndGetNewChildren(this.zookeeper,
this.zookeeper.rsZNode);
+ result = new ArrayList<String>(nads.size());
+ for (ZKUtil.NodeAndData nad : nads) {
+ String[] fullPath = nad.getNode().split("/");
+ result.add(fullPath[fullPath.length - 1]);
+ }
} catch (KeeperException e) {
this.abortable.abort("Get list of registered region servers", e);
}
@@ -344,7 +319,6 @@ public class ReplicationZookeeper {
/**
* Get the list of the replicators that have queues, they can be alive, dead
* or simply from a previous run
- * @param watch the watche to set
* @return a list of server names
*/
public List<String> getListOfReplicators() {
@@ -360,7 +334,6 @@ public class ReplicationZookeeper {
/**
* Get the list of peer clusters for the specified server names
* @param rs server names of the rs
- * @param watch the watch to set
* @return a list of peer cluster
*/
public List<String> getListPeersForRS(String rs) {
@@ -378,7 +351,6 @@ public class ReplicationZookeeper {
* Get the list of hlogs for the specified region server and peer cluster
* @param rs server names of the rs
* @param id peer cluster
- * @param watch the watch to set
* @return a list of hlogs
*/
public List<String> getListHLogsForPeerForRS(String rs, String id) {
@@ -401,10 +373,14 @@ public class ReplicationZookeeper {
public boolean lockOtherRS(String znode) {
try {
String parent = ZKUtil.joinZNode(this.rsZNode, znode);
+ if (parent.equals(rsServerNameZnode)) {
+ LOG.warn("Won't lock because this is us, we're dead!");
+ return false;
+ }
String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
ZKUtil.createAndWatch(this.zookeeper, p,
Bytes.toBytes(rsServerNameZnode));
} catch (KeeperException e) {
- this.abortable.abort("Failed lock other rs", e);
+ LOG.info("Failed lock other rs", e);
}
return true;
}
@@ -468,7 +444,7 @@ public class ReplicationZookeeper {
*/
public void deleteSource(String peerZnode) {
try {
- ZKUtil.deleteChildrenRecursively(this.zookeeper,
+ ZKUtil.deleteNodeRecursively(this.zookeeper,
ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
} catch (KeeperException e) {
this.abortable.abort("Failed delete of " + peerZnode, e);
@@ -481,7 +457,7 @@ public class ReplicationZookeeper {
*/
public void deleteRsQueues(String znode) {
try {
- ZKUtil.deleteChildrenRecursively(this.zookeeper,
+ ZKUtil.deleteNodeRecursively(this.zookeeper,
ZKUtil.joinZNode(rsZNode, znode));
} catch (KeeperException e) {
this.abortable.abort("Failed delete of " + znode, e);
@@ -492,7 +468,12 @@ public class ReplicationZookeeper {
* Delete this cluster's queues
*/
public void deleteOwnRSZNode() {
- deleteRsQueues(this.rsServerNameZnode);
+ try {
+ ZKUtil.deleteNodeRecursively(this.zookeeper,
+ this.rsServerNameZnode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e);
+ }
}
/**
@@ -510,13 +491,8 @@ public class ReplicationZookeeper {
return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
}
- /**
- * Tells if this cluster replicates or not
- *
- * @return if this is a master
- */
- public boolean isReplicationMaster() {
- return this.replicationMaster;
+ public void registerRegionServerListener(ZooKeeperListener listener) {
+ this.zookeeper.registerListener(listener);
}
/**
@@ -532,17 +508,29 @@ public class ReplicationZookeeper {
* Get a map of all peer clusters
* @return map of peer cluster, zk address to ZKW
*/
- public Map<String, ReplicationZookeeper> getPeerClusters() {
+ public Map<String, ZooKeeperWatcher> getPeerClusters() {
return this.peerClusters;
}
+ public String getRSZNode() {
+ return rsZNode;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public ZooKeeperWatcher getZookeeperWatcher() {
+ return this.zookeeper;
+ }
+
/**
* Tracker for status of the replication
*/
public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
- public ReplicationStatusTracker(ZooKeeperWatcher watcher, String node,
+ public ReplicationStatusTracker(ZooKeeperWatcher watcher,
Abortable abortable) {
- super(watcher, node, abortable);
+ super(watcher, getRepStateNode(), abortable);
}
@Override
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
Mon Oct 11 18:00:48 2010
@@ -23,24 +23,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
-// REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Implementation of a log cleaner that checks if a log is still scheduled for
* replication before deleting it when its TTL is over.
*/
-public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
+public class ReplicationLogCleaner implements LogCleanerDelegate {
private static final Log LOG =
LogFactory.getLog(ReplicationLogCleaner.class);
@@ -78,31 +75,30 @@ public class ReplicationLogCleaner imple
private boolean refreshHLogsAndSearch(String searchedLog) {
this.hlogs.clear();
final boolean lookForLog = searchedLog != null;
-// REENALBE
-// List<String> rss = zkHelper.getListOfReplicators(this);
-// if (rss == null) {
-// LOG.debug("Didn't find any region server that replicates, deleting: " +
-// searchedLog);
-// return false;
-// }
-// for (String rs: rss) {
-// List<String> listOfPeers = zkHelper.getListPeersForRS(rs, this);
-// // if rs just died, this will be null
-// if (listOfPeers == null) {
-// continue;
-// }
-// for (String id : listOfPeers) {
-// List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id,
this);
-// if (peersHlogs != null) {
-// this.hlogs.addAll(peersHlogs);
-// }
-// // early exit if we found the log
-// if(lookForLog && this.hlogs.contains(searchedLog)) {
-// LOG.debug("Found log in ZK, keeping: " + searchedLog);
-// return true;
-// }
-// }
-// }
+ List<String> rss = zkHelper.getListOfReplicators();
+ if (rss == null) {
+ LOG.debug("Didn't find any region server that replicates, deleting: " +
+ searchedLog);
+ return false;
+ }
+ for (String rs: rss) {
+ List<String> listOfPeers = zkHelper.getListPeersForRS(rs);
+ // if rs just died, this will be null
+ if (listOfPeers == null) {
+ continue;
+ }
+ for (String id : listOfPeers) {
+ List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id);
+ if (peersHlogs != null) {
+ this.hlogs.addAll(peersHlogs);
+ }
+ // early exit if we found the log
+ if(lookForLog && this.hlogs.contains(searchedLog)) {
+ LOG.debug("Found log in ZK, keeping: " + searchedLog);
+ return true;
+ }
+ }
+ }
LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
return false;
}
@@ -110,15 +106,15 @@ public class ReplicationLogCleaner imple
@Override
public void setConf(Configuration conf) {
this.conf = conf;
-// try {
- // REENABLE
-// this.zkHelper = new ReplicationZookeeperWrapper(
-// ZooKeeperWrapper.createInstance(this.conf,
-// HMaster.class.getName()),
-// this.conf, new AtomicBoolean(true), null);
-// } catch (IOException e) {
-// LOG.error(e);
-// }
+ try {
+ ZooKeeperWatcher zkw =
+ new ZooKeeperWatcher(conf, this.getClass().getName(), null);
+ this.zkHelper = new ReplicationZookeeper(conf, zkw);
+ } catch (KeeperException e) {
+ LOG.error("Error while configuring " + this.getClass().getName(), e);
+ } catch (IOException e) {
+ LOG.error("Error while configuring " + this.getClass().getName(), e);
+ }
refreshHLogsAndSearch(null);
}
@@ -126,7 +122,4 @@ public class ReplicationLogCleaner imple
public Configuration getConf() {
return conf;
}
-
- @Override
- public void process(WatchedEvent watchedEvent) {}
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
Mon Oct 11 18:00:48 2010
@@ -45,7 +45,6 @@ import org.apache.zookeeper.KeeperExcept
public class Replication implements WALObserver {
private final boolean replication;
private final ReplicationSourceManager replicationManager;
- private boolean replicationMaster;
private final AtomicBoolean replicating = new AtomicBoolean(true);
private final ReplicationZookeeper zkHelper;
private final Configuration conf;
@@ -70,10 +69,8 @@ public class Replication implements WALO
this.replication = isReplication(this.conf);
if (replication) {
this.zkHelper = new ReplicationZookeeper(server, this.replicating);
- this.replicationMaster = zkHelper.isReplicationMaster();
- this.replicationManager = this.replicationMaster ?
- new ReplicationSourceManager(zkHelper, conf, this.server,
- fs, this.replicating, logDir, oldLogDir) : null;
+ this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
+ this.server, fs, this.replicating, logDir, oldLogDir) ;
} else {
this.replicationManager = null;
this.zkHelper = null;
@@ -93,10 +90,8 @@ public class Replication implements WALO
*/
public void join() {
if (this.replication) {
- if (this.replicationMaster) {
- this.replicationManager.join();
- }
- this.zkHelper.deleteOwnRSZNode();
+ this.replicationManager.join();
+ this.zkHelper.deleteOwnRSZNode();
}
}
@@ -106,7 +101,7 @@ public class Replication implements WALO
* @throws IOException
*/
public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
- if (this.replication && !this.replicationMaster) {
+ if (this.replication) {
this.replicationSink.replicateEntries(entries);
}
}
@@ -118,11 +113,8 @@ public class Replication implements WALO
*/
public void startReplicationServices() throws IOException {
if (this.replication) {
- if (this.replicationMaster) {
- this.replicationManager.init();
- } else {
- this.replicationSink = new ReplicationSink(this.conf, this.server);
- }
+ this.replicationManager.init();
+ this.replicationSink = new ReplicationSink(this.conf, this.server);
}
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
Mon Oct 11 18:00:48 2010
@@ -38,7 +38,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class is responsible for replicating the edits coming
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Mon Oct 11 18:00:48 2010
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.zookeeper.KeeperException;
/**
* Class that handles the source of a replication stream.
@@ -195,7 +196,7 @@ public class ReplicationSource extends T
/**
* Select a number of peers at random using the ratio. Mininum 1.
*/
- private void chooseSinks() {
+ private void chooseSinks() throws KeeperException {
this.currentPeers.clear();
List<HServerAddress> addresses =
this.zkHelper.getPeersAddresses(peerClusterId);
@@ -231,8 +232,14 @@ public class ReplicationSource extends T
// If this is recovered, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs)
if (this.queueRecovered) {
-// this.position = this.zkHelper.getHLogRepPosition(
-// this.peerClusterZnode, this.queue.peek().getName());
+ try {
+ this.position = this.zkHelper.getHLogRepPosition(
+ this.peerClusterZnode, this.queue.peek().getName());
+ } catch (KeeperException e) {
+ LOG.error("Couldn't get the position of this recovered queue " +
+ peerClusterZnode, e);
+ this.abort();
+ }
}
int sleepMultiplier = 1;
// Loop until we close down
@@ -380,6 +387,8 @@ public class ReplicationSource extends T
Thread.sleep(this.sleepForRetries);
} catch (InterruptedException e) {
LOG.error("Interrupted while trying to connect to sinks", e);
+ } catch (KeeperException e) {
+ LOG.error("Error talking to zookeeper, retrying", e);
}
}
}
@@ -553,6 +562,8 @@ public class ReplicationSource extends T
} while (!this.stopper.isStopped() && down);
} catch (InterruptedException e) {
LOG.debug("Interrupted while trying to contact the peer cluster");
+ } catch (KeeperException e) {
+ LOG.error("Error talking to zookeeper, retrying", e);
}
}
@@ -589,7 +600,7 @@ public class ReplicationSource extends T
}
};
Threads.setDaemonThreadRunning(
- this, n + ".replicationSource," + clusterId, handler);
+ this, n + ".replicationSource," + peerClusterZnode, handler);
}
/**
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
Mon Oct 11 18:00:48 2010
@@ -36,8 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* This class is responsible to manage all the replication
@@ -104,8 +104,10 @@ public class ReplicationSourceManager {
this.fs = fs;
this.logDir = logDir;
this.oldLogDir = oldLogDir;
+ this.zkHelper.registerRegionServerListener(
+ new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
List<String> otherRSs =
- this.zkHelper.getRegisteredRegionServers(new
OtherRegionServerWatcher());
+ this.zkHelper.getRegisteredRegionServers();
this.otherRegionServers = otherRSs == null ? new ArrayList<String>() :
otherRSs;
}
@@ -145,7 +147,10 @@ public class ReplicationSourceManager {
ReplicationSourceInterface src = addSource(id);
src.startup();
}
- List<String> currentReplicators = this.zkHelper.getListOfReplicators();
+ List<String> currentReplicators =
this.zkHelper.getRegisteredRegionServers();
+ if (currentReplicators == null || currentReplicators.size() == 0) {
+ return;
+ }
synchronized (otherRegionServers) {
LOG.info("Current list of replicators: " + currentReplicators
+ " other RSs: " + otherRegionServers);
@@ -322,29 +327,59 @@ public class ReplicationSourceManager {
* in the local cluster. It initiates the process to transfer the queues
* if it is able to grab the lock.
*/
- public class OtherRegionServerWatcher implements Watcher {
- @Override
- public void process(WatchedEvent watchedEvent) {
- LOG.info(" event " + watchedEvent);
- if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
- watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
+ public class OtherRegionServerWatcher extends ZooKeeperListener {
+
+ /**
+ * Construct a ZooKeeper event listener.
+ */
+ public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
+ super(watcher);
+ }
+
+ /**
+ * Called when a new node has been created.
+ * @param path full path of the new node
+ */
+ public void nodeCreated(String path) {
+ refreshRegionServersList(path);
+ }
+
+ /**
+ * Called when a node has been deleted
+ * @param path full path of the deleted node
+ */
+ public void nodeDeleted(String path) {
+ boolean cont = refreshRegionServersList(path);
+ if (!cont) {
return;
}
-
- List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
+ LOG.info(path + " znode expired, trying to lock it");
+ String[] rsZnodeParts = path.split("/");
+ transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+ }
+
+ /**
+ * Called when an existing node has a child node added or removed.
+ * @param path full path of the node whose children have changed
+ */
+ public void nodeChildrenChanged(String path) {
+ refreshRegionServersList(path);
+ }
+
+ private boolean refreshRegionServersList(String path) {
+ if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
+ return false;
+ }
+ List<String> newRsList = (zkHelper.getRegisteredRegionServers());
if (newRsList == null) {
- return;
+ return false;
} else {
synchronized (otherRegionServers) {
otherRegionServers.clear();
otherRegionServers.addAll(newRsList);
}
}
- if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
- LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
- String[] rsZnodeParts = watchedEvent.getPath().split("/");
- transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
- }
+ return true;
}
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Mon
Oct 11 18:00:48 2010
@@ -916,7 +916,7 @@ public class ZKUtil {
public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String
node)
throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
- if(!children.isEmpty()) {
+ if(children != null || !children.isEmpty()) {
for(String child : children) {
deleteNodeRecursively(zkw, joinZNode(node, child));
}
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
Mon Oct 11 18:00:48 2010
@@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -54,10 +56,9 @@ public class TestReplication {
private static Configuration conf1;
private static Configuration conf2;
-/*
- private static ZooKeeperWrapper zkw1;
- private static ZooKeeperWrapper zkw2;
- */
+
+ private static ZooKeeperWatcher zkw1;
+ private static ZooKeeperWatcher zkw2;
private static HTable htable1;
private static HTable htable2;
@@ -92,15 +93,15 @@ public class TestReplication {
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
- /* REENALBE
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
- zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1");
- zkw1.writeZNode("/1", "replication", "");
- zkw1.writeZNode("/1/replication", "master",
+ zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null);
+ ZKUtil.createWithParents(zkw1, "/1/replication/master");
+ ZKUtil.createWithParents(zkw1, "/1/replication/state");
+ ZKUtil.setData(zkw1, "/1/replication/master", Bytes.toBytes(
conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+ conf1.get("hbase.zookeeper.property.clientPort")+":/1"));
setIsReplication(true);
LOG.info("Setup first Zk");
@@ -113,15 +114,13 @@ public class TestReplication {
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
- zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2");
- zkw2.writeZNode("/2", "replication", "");
- zkw2.writeZNode("/2/replication", "master",
- conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+ zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null);
+ ZKUtil.createWithParents(zkw2, "/2/replication");
- zkw1.writeZNode("/1/replication/peers", "1",
+ ZKUtil.createWithParents(zkw1, "/1/replication/peers/2");
+ ZKUtil.setData(zkw1, "/1/replication/peers/2", Bytes.toBytes(
conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+ conf2.get("hbase.zookeeper.property.clientPort")+":/2"));
LOG.info("Setup second Zk");
@@ -143,12 +142,12 @@ public class TestReplication {
htable1 = new HTable(conf1, tableName);
htable1.setWriteBufferSize(1024);
htable2 = new HTable(conf2, tableName);
- */
}
private static void setIsReplication(boolean rep) throws Exception {
LOG.info("Set rep " + rep);
- // REENALBE zkw1.writeZNode("/1/replication", "state",
Boolean.toString(rep));
+ ZKUtil.setData(zkw1,"/1/replication/state",
+ Bytes.toBytes(Boolean.toString(rep)));
// Takes some ms for ZK to fire the watcher
Thread.sleep(SLEEP_TIME);
}
@@ -181,7 +180,7 @@ public class TestReplication {
* Add a row, check it's replicated, delete it, check's gone
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void testSimplePutDelete() throws Exception {
LOG.info("testSimplePutDelete");
Put put = new Put(row);
@@ -229,7 +228,7 @@ public class TestReplication {
* Try a small batch upload using the write buffer, check it's replicated
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void testSmallBatch() throws Exception {
LOG.info("testSmallBatch");
Put put;
@@ -273,7 +272,7 @@ public class TestReplication {
* replicated, enable it, try replicating and it should work
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void testStartStop() throws Exception {
// Test stopping replication
@@ -342,7 +341,7 @@ public class TestReplication {
* hlog rolling and other non-trivial code paths
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void loadTesting() throws Exception {
htable1.setWriteBufferSize(1024);
htable1.setAutoFlush(false);
@@ -396,7 +395,7 @@ public class TestReplication {
* the upload. The failover happens internally.
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void queueFailover() throws Exception {
utility1.createMultiRegions(htable1, famName);
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java?rev=1021447&r1=1021446&r2=1021447&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
Mon Oct 11 18:00:48 2010
@@ -69,7 +69,7 @@ public class TestReplicationSource {
* time reading logs that are being archived.
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void testLogMoving() throws Exception{
Path logPath = new Path(logDir, "log");
HLog.Writer writer = HLog.createWriter(fs, logPath, conf);
Copied:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
(from r1003174,
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java)
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java?p2=hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java&p1=hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java&r1=1003174&r2=1021447&rev=1021447&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
Mon Oct 11 18:00:48 2010
@@ -47,10 +47,10 @@ import java.util.concurrent.atomic.Atomi
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class DISABLEDTestReplicationSink {
+public class TestReplicationSink {
private static final Log LOG =
- LogFactory.getLog(DISABLEDTestReplicationSink.class);
+ LogFactory.getLog(TestReplicationSink.class);
private static final int BATCH_SIZE = 10;
@@ -128,7 +128,7 @@ public class DISABLEDTestReplicationSink
* Insert a whole batch of entries
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void testBatchSink() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
@@ -144,7 +144,7 @@ public class DISABLEDTestReplicationSink
* Insert a mix of puts and deletes
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void testMixedPutDelete() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
for(int i = 0; i < BATCH_SIZE/2; i++) {
@@ -168,7 +168,7 @@ public class DISABLEDTestReplicationSink
* Insert to 2 different tables
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void testMixedPutTables() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
@@ -189,7 +189,7 @@ public class DISABLEDTestReplicationSink
* Insert then do different types of deletes
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void testMixedDeletes() throws Exception {
HLog.Entry[] entries = new HLog.Entry[3];
for(int i = 0; i < 3; i++) {
@@ -214,7 +214,7 @@ public class DISABLEDTestReplicationSink
* before the actual Put that creates it.
* @throws Exception
*/
- @Ignore @Test
+ @Test
public void testApplyDeleteBeforePut() throws Exception {
HLog.Entry[] entries = new HLog.Entry[5];
for(int i = 0; i < 2; i++) {
Added:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1021447&view=auto
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
(added)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Mon Oct 11 18:00:48 2010
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestReplicationSourceManager {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestReplicationSourceManager.class);
+
+ private static Configuration conf;
+
+ private static HBaseTestingUtility utility;
+
+ private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
+
+ private static Replication replication;
+
+ private static ReplicationSourceManager manager;
+
+ private static ZooKeeperWatcher zkw;
+
+ private static HTableDescriptor htd;
+
+ private static HRegionInfo hri;
+
+ private static final byte[] r1 = Bytes.toBytes("r1");
+
+ private static final byte[] r2 = Bytes.toBytes("r2");
+
+ private static final byte[] f1 = Bytes.toBytes("f1");
+
+ private static final byte[] f2 = Bytes.toBytes("f2");
+
+ private static final byte[] test = Bytes.toBytes("test");
+
+ private static FileSystem fs;
+
+ private static Path oldLogDir;
+
+ private static Path logDir;
+
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ conf = HBaseConfiguration.create();
+ conf.set("replication.replicationsource.implementation",
+ ReplicationSourceDummy.class.getCanonicalName());
+ conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+ utility = new HBaseTestingUtility(conf);
+ utility.startMiniZKCluster();
+
+ zkw = new ZooKeeperWatcher(conf, "test", null);
+ ZKUtil.createWithParents(zkw, "/hbase/replication");
+ ZKUtil.createWithParents(zkw, "/hbase/replication/master");
+ ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+ ZKUtil.setData(zkw, "/hbase/replication/master",
+ Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+ conf.get("hbase.zookeeper.property.clientPort")+":/1"));
+ ZKUtil.setData(zkw, "/hbase/replication/peers/1",Bytes.toBytes(
+ conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+ conf.get("hbase.zookeeper.property.clientPort")+":/1"));
+
+ replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
+ manager = replication.getReplicationManager();
+ fs = FileSystem.get(conf);
+ oldLogDir = new Path(utility.getTestDir(),
+ HConstants.HREGION_OLDLOGDIR_NAME);
+ logDir = new Path(utility.getTestDir(),
+ HConstants.HREGION_LOGDIR_NAME);
+
+ manager.addSource("1");
+
+ htd = new HTableDescriptor(test);
+ HColumnDescriptor col = new HColumnDescriptor("f1");
+ col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ htd.addFamily(col);
+ col = new HColumnDescriptor("f2");
+ col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+ htd.addFamily(col);
+
+ hri = new HRegionInfo(htd, r1, r2);
+
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ manager.join();
+ utility.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ fs.delete(logDir, true);
+ fs.delete(oldLogDir, true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ setUp();
+ }
+
+ @Test
+ public void testLogRoll() throws Exception {
+ long seq = 0;
+ long baseline = 1000;
+ long time = baseline;
+ KeyValue kv = new KeyValue(r1, f1, r1);
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+
+ List<WALObserver> listeners = new ArrayList<WALObserver>();
+ listeners.add(replication);
+ HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners,
+ URLEncoder.encode("regionserver:60020", "UTF8"));
+
+ manager.init();
+
+ // Testing normal log rolling every 20
+ for(long i = 1; i < 101; i++) {
+ if(i > 1 && i % 20 == 0) {
+ hlog.rollWriter();
+ }
+ LOG.info(i);
+ HLogKey key = new HLogKey(hri.getRegionName(),
+ test, seq++, System.currentTimeMillis());
+ hlog.append(hri, key, edit);
+ }
+
+ // Simulate a rapid insert that's followed
+ // by a report that's still not totally complete (missing last one)
+ LOG.info(baseline + " and " + time);
+ baseline += 101;
+ time = baseline;
+ LOG.info(baseline + " and " + time);
+
+ for (int i = 0; i < 3; i++) {
+ HLogKey key = new HLogKey(hri.getRegionName(),
+ test, seq++, System.currentTimeMillis());
+ hlog.append(hri, key, edit);
+ }
+
+ assertEquals(6, manager.getHLogs().size());
+
+ hlog.rollWriter();
+
+
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
+ "1", 0, false);
+
+ HLogKey key = new HLogKey(hri.getRegionName(),
+ test, seq++, System.currentTimeMillis());
+ hlog.append(hri, key, edit);
+
+ assertEquals(1, manager.getHLogs().size());
+
+
+ // TODO Need a case with only 2 HLogs and we only want to delete the first
one
+ }
+
+ static class DummyServer implements Server {
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ return zkw;
+ }
+
+ @Override
+ public CatalogTracker getCatalogTracker() {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ @Override
+ public String getServerName() {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ @Override
+ public void stop(String why) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+ }
+
+}