Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Fri Aug 27 20:53:15 2010 @@ -19,82 +19,84 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.IOException; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.regionserver.wal.WALObserver; import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper; import org.apache.hadoop.hbase.util.Bytes; -// REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; - -import java.io.IOException; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; /** * Replication serves as an umbrella over the setup of replication and * is used by HRS. */ -public class Replication implements LogEntryVisitor { - +public class Replication implements WALObserver { private final boolean replication; -// REENALBE private final ReplicationSourceManager replicationManager; + private final ReplicationSourceManager replicationManager; private boolean replicationMaster; private final AtomicBoolean replicating = new AtomicBoolean(true); -// REENALBE private final ReplicationZookeeperWrapper zkHelper; + private final ReplicationZookeeperWrapper zkHelper; private final Configuration conf; private ReplicationSink replicationSink; + // Hosting server private final Server server; /** * Instantiate the replication management (if rep is enabled). - * @param conf conf to use - * @param hsi the info if this region server + * @param server Hosting server * @param fs handle to the filesystem + * @param logDir * @param oldLogDir directory where logs are archived - * @param stopper This is set when we are to shut down. * @throws IOException */ - public Replication(final Server server, FileSystem fs, Path logDir, - Path oldLogDir) throws IOException { + public Replication(final Server server, final FileSystem fs, + final Path logDir, final Path oldLogDir) + throws IOException { this.server = server; this.conf = this.server.getConfiguration(); - this.replication = - conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false); + this.replication = isReplication(this.conf); if (replication) { - // REENALBE -// this.zkHelper = new ReplicationZookeeperWrapper( -// ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf, -// this.replicating, hsi.getServerName()); -// this.replicationMaster = zkHelper.isReplicationMaster(); -// this.replicationManager = this.replicationMaster ? -// new ReplicationSourceManager(zkHelper, conf, stopRequested, -// fs, this.replicating, logDir, oldLogDir) : null; + this.zkHelper = new ReplicationZookeeperWrapper(server.getZooKeeper(), + this.conf, this.replicating, this.server.getServerName()); + this.replicationMaster = zkHelper.isReplicationMaster(); + this.replicationManager = this.replicationMaster ? + new ReplicationSourceManager(zkHelper, conf, this.server, + fs, this.replicating, logDir, oldLogDir) : null; } else { - // REENABLE replicationManager = null; - // REENALBE zkHelper = null; + this.replicationManager = null; + this.zkHelper = null; } } /** + * @param c Configuration to look at + * @return True if replication is enabled. + */ + public static boolean isReplication(final Configuration c) { + return c.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false); + } + + /** * Join with the replication threads */ public void join() { if (this.replication) { if (this.replicationMaster) { -// REENABLE this.replicationManager.join(); + this.replicationManager.join(); } -// REENABLE this.zkHelper.deleteOwnRSZNode(); + this.zkHelper.deleteOwnRSZNode(); } } @@ -117,10 +119,9 @@ public class Replication implements LogE public void startReplicationServices() throws IOException { if (this.replication) { if (this.replicationMaster) { - // REENALBE this.replicationManager.init(); + this.replicationManager.init(); } else { - this.replicationSink = - new ReplicationSink(this.conf, this.server); + this.replicationSink = new ReplicationSink(this.conf, this.server); } } } @@ -130,12 +131,12 @@ public class Replication implements LogE * @return the manager if replication is enabled, else returns false */ public ReplicationSourceManager getReplicationManager() { - return null; // REENALBE replicationManager; + return this.replicationManager; } @Override public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, - WALEdit logEdit) { + WALEdit logEdit) { NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); byte[] family; @@ -152,13 +153,13 @@ public class Replication implements LogE } } - /** - * Add this class as a log entry visitor for HLog if replication is enabled - * @param hlog log that was add ourselves on - */ - public void addLogEntryVisitor(HLog hlog) { - if (replication) { - hlog.addLogEntryVisitor(this); - } + @Override + public void logRolled(Path p) { + getReplicationManager().logRolled(p); + } + + @Override + public void logRollRequested() { + // Not interested } } \ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Fri Aug 27 20:53:15 2010 @@ -19,13 +19,13 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hbase.Stoppable; /** * Interface that defines a replication source @@ -45,7 +45,7 @@ public interface ReplicationSourceInterf public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, - final AtomicBoolean stopper, + final Stoppable stopper, final AtomicBoolean replicating, final String peerClusterId) throws IOException; Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Fri Aug 27 20:53:15 2010 @@ -25,7 +25,11 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALObserver; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -50,334 +54,324 @@ import java.util.concurrent.atomic.Atomi * tries to grab a lock in order to transfer all the queues in a local * old source. */ -public class ReplicationSourceManager implements LogActionsListener { +public class ReplicationSourceManager { + private static final Log LOG = + LogFactory.getLog(ReplicationSourceManager.class); + // List of all the sources that read this RS's logs + private final List<ReplicationSourceInterface> sources; + // List of all the sources we got from died RSs + private final List<ReplicationSourceInterface> oldsources; + // Indicates if we are currently replicating + private final AtomicBoolean replicating; + // Helper for zookeeper + private final ReplicationZookeeperWrapper zkHelper; + // All about stopping + private final Stoppable stopper; + // All logs we are currently trackign + private final SortedSet<String> hlogs; + private final Configuration conf; + private final FileSystem fs; + // The path to the latest log we saw, for new coming sources + private Path latestPath; + // List of all the other region servers in this cluster + private final List<String> otherRegionServers; + // Path to the hlogs directories + private final Path logDir; + // Path to the hlog archive + private final Path oldLogDir; - @Override - public void logRolled(Path newFile) { - // TODO Auto-generated method stub - - } - // REENABLE -// -// private static final Log LOG = -// LogFactory.getLog(ReplicationSourceManager.class); -// // List of all the sources that read this RS's logs -// private final List<ReplicationSourceInterface> sources; -// // List of all the sources we got from died RSs -// private final List<ReplicationSourceInterface> oldsources; -// // Indicates if we are currently replicating -// private final AtomicBoolean replicating; -// // Helper for zookeeper -// private final ReplicationZookeeperWrapper zkHelper; -// // Indicates if the region server is closing -// private final AtomicBoolean stopper; -// // All logs we are currently trackign -// private final SortedSet<String> hlogs; -// private final Configuration conf; -// private final FileSystem fs; -// // The path to the latest log we saw, for new coming sources -// private Path latestPath; -// // List of all the other region servers in this cluster -// private final List<String> otherRegionServers; -// // Path to the hlogs directories -// private final Path logDir; -// // Path to the hlog archive -// private final Path oldLogDir; -// -// /** -// * Creates a replication manager and sets the watch on all the other -// * registered region servers -// * @param zkHelper the zk helper for replication -// * @param conf the configuration to use -// * @param stopper the stopper object for this region server -// * @param fs the file system to use -// * @param replicating the status of the replication on this cluster -// * @param logDir the directory that contains all hlog directories of live RSs -// * @param oldLogDir the directory where old logs are archived -// */ -// public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper, -// final Configuration conf, -// final AtomicBoolean stopper, -// final FileSystem fs, -// final AtomicBoolean replicating, -// final Path logDir, -// final Path oldLogDir) { -// this.sources = new ArrayList<ReplicationSourceInterface>(); -// this.replicating = replicating; -// this.zkHelper = zkHelper; -// this.stopper = stopper; -// this.hlogs = new TreeSet<String>(); -// this.oldsources = new ArrayList<ReplicationSourceInterface>(); -// this.conf = conf; -// this.fs = fs; -// this.logDir = logDir; -// this.oldLogDir = oldLogDir; -// List<String> otherRSs = -// this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher()); -// this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs; -// } -// -// /** -// * Provide the id of the peer and a log key and this method will figure which -// * hlog it belongs to and will log, for this region server, the current -// * position. It will also clean old logs from the queue. -// * @param log Path to the log currently being replicated from -// * replication status in zookeeper. It will also delete older entries. -// * @param id id of the peer cluster -// * @param position current location in the log -// * @param queueRecovered indicates if this queue comes from another region server -// */ -// public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) { -// String key = log.getName(); -// LOG.info("Going to report log #" + key + " for position " + position + " in " + log); -// this.zkHelper.writeReplicationStatus(key.toString(), id, position); -// synchronized (this.hlogs) { -// if (!queueRecovered && this.hlogs.first() != key) { -// SortedSet<String> hlogSet = this.hlogs.headSet(key); -// LOG.info("Removing " + hlogSet.size() + -// " logs in the list: " + hlogSet); -// for (String hlog : hlogSet) { -// this.zkHelper.removeLogFromList(hlog.toString(), id); -// } -// hlogSet.clear(); -// } -// } -// } -// -// /** -// * Adds a normal source per registered peer cluster and tries to process all -// * old region server hlog queues -// */ -// public void init() throws IOException { -// for (String id : this.zkHelper.getPeerClusters().keySet()) { -// ReplicationSourceInterface src = addSource(id); -// src.startup(); -// } -// List<String> currentReplicators = this.zkHelper.getListOfReplicators(null); -// synchronized (otherRegionServers) { -// LOG.info("Current list of replicators: " + currentReplicators -// + " other RSs: " + otherRegionServers); -// } -// // Look if there's anything to process after a restart -// for (String rs : currentReplicators) { -// synchronized (otherRegionServers) { -// if (!this.otherRegionServers.contains(rs)) { -// transferQueues(rs); -// } -// } -// } -// } -// -// /** -// * Add a new normal source to this region server -// * @param id the id of the peer cluster -// * @return the created source -// * @throws IOException -// */ -// public ReplicationSourceInterface addSource(String id) throws IOException { -// ReplicationSourceInterface src = -// getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); -// this.sources.add(src); -// synchronized (this.hlogs) { -// if (this.hlogs.size() > 0) { -// this.zkHelper.addLogToList(this.hlogs.first(), -// this.sources.get(0).getPeerClusterZnode()); -// src.enqueueLog(this.latestPath); -// } -// } -// return src; -// } -// -// /** -// * Terminate the replication on this region server -// */ -// public void join() { -// if (this.sources.size() == 0) { -// this.zkHelper.deleteOwnRSZNode(); -// } -// for (ReplicationSourceInterface source : this.sources) { -// source.terminate(); -// } -// } -// -// /** -// * Get a copy of the hlogs of the first source on this rs -// * @return a sorted set of hlog names -// */ -// protected SortedSet<String> getHLogs() { -// return new TreeSet(this.hlogs); -// } -// -// /** -// * Get a list of all the normal sources of this rs -// * @return lis of all sources -// */ -// public List<ReplicationSourceInterface> getSources() { -// return this.sources; -// } -// -// @Override -// public void logRolled(Path newLog) { -// if (this.sources.size() > 0) { -// this.zkHelper.addLogToList(newLog.getName(), -// this.sources.get(0).getPeerClusterZnode()); -// } -// synchronized (this.hlogs) { -// this.hlogs.add(newLog.getName()); -// } -// this.latestPath = newLog; -// // This only update the sources we own, not the recovered ones -// for (ReplicationSourceInterface source : this.sources) { -// source.enqueueLog(newLog); -// } -// } -// -// /** -// * Get the ZK help of this manager -// * @return the helper -// */ -// public ReplicationZookeeperWrapper getRepZkWrapper() { -// return zkHelper; -// } -// -// /** -// * Factory method to create a replication source -// * @param conf the configuration to use -// * @param fs the file system to use -// * @param manager the manager to use -// * @param stopper the stopper object for this region server -// * @param replicating the status of the replication on this cluster -// * @param peerClusterId the id of the peer cluster -// * @return the created source -// * @throws IOException -// */ -// public ReplicationSourceInterface getReplicationSource( -// final Configuration conf, -// final FileSystem fs, -// final ReplicationSourceManager manager, -// final AtomicBoolean stopper, -// final AtomicBoolean replicating, -// final String peerClusterId) throws IOException { -// ReplicationSourceInterface src; -// try { -// Class c = Class.forName(conf.get("replication.replicationsource.implementation", -// ReplicationSource.class.getCanonicalName())); -// src = (ReplicationSourceInterface) c.newInstance(); -// } catch (Exception e) { -// LOG.warn("Passed replication source implemention throws errors, " + -// "defaulting to ReplicationSource", e); -// src = new ReplicationSource(); -// -// } -// src.init(conf, fs, manager, stopper, replicating, peerClusterId); -// return src; -// } -// -// /** -// * Transfer all the queues of the specified to this region server. -// * First it tries to grab a lock and if it works it will move the -// * znodes and finally will delete the old znodes. -// * -// * It creates one old source for any type of source of the old rs. -// * @param rsZnode -// */ -// public void transferQueues(String rsZnode) { -// // We try to lock that rs' queue directory -// if (this.stopper.get()) { -// LOG.info("Not transferring queue since we are shutting down"); -// return; -// } -// if (!this.zkHelper.lockOtherRS(rsZnode)) { -// return; -// } -// LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); -// SortedMap<String, SortedSet<String>> newQueues = -// this.zkHelper.copyQueuesFromRS(rsZnode); -// if (newQueues == null || newQueues.size() == 0) { -// return; -// } -// this.zkHelper.deleteRsQueues(rsZnode); -// -// for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) { -// String peerId = entry.getKey(); -// try { -// ReplicationSourceInterface src = getReplicationSource(this.conf, -// this.fs, this, this.stopper, this.replicating, peerId); -// this.oldsources.add(src); -// for (String hlog : entry.getValue()) { -// src.enqueueLog(new Path(this.oldLogDir, hlog)); -// } -// src.startup(); -// } catch (IOException e) { -// // TODO manage it -// LOG.error("Failed creating a source", e); -// } -// } -// } -// -// /** -// * Clear the references to the specified old source -// * @param src source to clear -// */ -// public void closeRecoveredQueue(ReplicationSourceInterface src) { -// LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); -// this.oldsources.remove(src); -// this.zkHelper.deleteSource(src.getPeerClusterZnode()); -// } -// -// /** -// * Watcher used to be notified of the other region server's death -// * in the local cluster. It initiates the process to transfer the queues -// * if it is able to grab the lock. -// */ -// public class OtherRegionServerWatcher implements Watcher { -// @Override -// public void process(WatchedEvent watchedEvent) { -// LOG.info(" event " + watchedEvent); -// if (watchedEvent.getType().equals(Event.KeeperState.Expired) || -// watchedEvent.getType().equals(Event.KeeperState.Disconnected)) { -// return; -// } -// -// List<String> newRsList = (zkHelper.getRegisteredRegionServers(this)); -// if (newRsList == null) { -// return; -// } else { -// synchronized (otherRegionServers) { -// otherRegionServers.clear(); -// otherRegionServers.addAll(newRsList); -// } -// } -// if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) { -// LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it"); -// String[] rsZnodeParts = watchedEvent.getPath().split("/"); -// transferQueues(rsZnodeParts[rsZnodeParts.length-1]); -// } -// } -// } -// -// /** -// * Get the directory where hlogs are archived -// * @return the directory where hlogs are archived -// */ -// public Path getOldLogDir() { -// return this.oldLogDir; -// } -// -// /** -// * Get the directory where hlogs are stored by their RSs -// * @return the directory where hlogs are stored by their RSs -// */ -// public Path getLogDir() { -// return this.logDir; -// } -// -// /** -// * Get the handle on the local file system -// * @returnthe handle on the local file system -// */ -// public FileSystem getFs() { -// return this.fs; -// } + /** + * Creates a replication manager and sets the watch on all the other + * registered region servers + * @param zkHelper the zk helper for replication + * @param conf the configuration to use + * @param stopper the stopper object for this region server + * @param fs the file system to use + * @param replicating the status of the replication on this cluster + * @param logDir the directory that contains all hlog directories of live RSs + * @param oldLogDir the directory where old logs are archived + */ + public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper, + final Configuration conf, + final Stoppable stopper, + final FileSystem fs, + final AtomicBoolean replicating, + final Path logDir, + final Path oldLogDir) { + this.sources = new ArrayList<ReplicationSourceInterface>(); + this.replicating = replicating; + this.zkHelper = zkHelper; + this.stopper = stopper; + this.hlogs = new TreeSet<String>(); + this.oldsources = new ArrayList<ReplicationSourceInterface>(); + this.conf = conf; + this.fs = fs; + this.logDir = logDir; + this.oldLogDir = oldLogDir; + List<String> otherRSs = + this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher()); + this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs; + } + + /** + * Provide the id of the peer and a log key and this method will figure which + * hlog it belongs to and will log, for this region server, the current + * position. It will also clean old logs from the queue. + * @param log Path to the log currently being replicated from + * replication status in zookeeper. It will also delete older entries. + * @param id id of the peer cluster + * @param position current location in the log + * @param queueRecovered indicates if this queue comes from another region server + */ + public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) { + String key = log.getName(); + LOG.info("Going to report log #" + key + " for position " + position + " in " + log); + this.zkHelper.writeReplicationStatus(key.toString(), id, position); + synchronized (this.hlogs) { + if (!queueRecovered && this.hlogs.first() != key) { + SortedSet<String> hlogSet = this.hlogs.headSet(key); + LOG.info("Removing " + hlogSet.size() + + " logs in the list: " + hlogSet); + for (String hlog : hlogSet) { + this.zkHelper.removeLogFromList(hlog.toString(), id); + } + hlogSet.clear(); + } + } + } + + /** + * Adds a normal source per registered peer cluster and tries to process all + * old region server hlog queues + */ + public void init() throws IOException { + for (String id : this.zkHelper.getPeerClusters().keySet()) { + ReplicationSourceInterface src = addSource(id); + src.startup(); + } + List<String> currentReplicators = this.zkHelper.getListOfReplicators(null); + synchronized (otherRegionServers) { + LOG.info("Current list of replicators: " + currentReplicators + + " other RSs: " + otherRegionServers); + } + // Look if there's anything to process after a restart + for (String rs : currentReplicators) { + synchronized (otherRegionServers) { + if (!this.otherRegionServers.contains(rs)) { + transferQueues(rs); + } + } + } + } + + /** + * Add a new normal source to this region server + * @param id the id of the peer cluster + * @return the created source + * @throws IOException + */ + public ReplicationSourceInterface addSource(String id) throws IOException { + ReplicationSourceInterface src = + getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); + this.sources.add(src); + synchronized (this.hlogs) { + if (this.hlogs.size() > 0) { + this.zkHelper.addLogToList(this.hlogs.first(), + this.sources.get(0).getPeerClusterZnode()); + src.enqueueLog(this.latestPath); + } + } + return src; + } + + /** + * Terminate the replication on this region server + */ + public void join() { + if (this.sources.size() == 0) { + this.zkHelper.deleteOwnRSZNode(); + } + for (ReplicationSourceInterface source : this.sources) { + source.terminate(); + } + } + + /** + * Get a copy of the hlogs of the first source on this rs + * @return a sorted set of hlog names + */ + protected SortedSet<String> getHLogs() { + return new TreeSet(this.hlogs); + } + + /** + * Get a list of all the normal sources of this rs + * @return lis of all sources + */ + public List<ReplicationSourceInterface> getSources() { + return this.sources; + } + + void logRolled(Path newLog) { + if (this.sources.size() > 0) { + this.zkHelper.addLogToList(newLog.getName(), + this.sources.get(0).getPeerClusterZnode()); + } + synchronized (this.hlogs) { + this.hlogs.add(newLog.getName()); + } + this.latestPath = newLog; + // This only update the sources we own, not the recovered ones + for (ReplicationSourceInterface source : this.sources) { + source.enqueueLog(newLog); + } + } + + /** + * Get the ZK help of this manager + * @return the helper + */ + public ReplicationZookeeperWrapper getRepZkWrapper() { + return zkHelper; + } + /** + * Factory method to create a replication source + * @param conf the configuration to use + * @param fs the file system to use + * @param manager the manager to use + * @param stopper the stopper object for this region server + * @param replicating the status of the replication on this cluster + * @param peerClusterId the id of the peer cluster + * @return the created source + * @throws IOException + */ + public ReplicationSourceInterface getReplicationSource( + final Configuration conf, + final FileSystem fs, + final ReplicationSourceManager manager, + final Stoppable stopper, + final AtomicBoolean replicating, + final String peerClusterId) throws IOException { + ReplicationSourceInterface src; + try { + Class c = Class.forName(conf.get("replication.replicationsource.implementation", + ReplicationSource.class.getCanonicalName())); + src = (ReplicationSourceInterface) c.newInstance(); + } catch (Exception e) { + LOG.warn("Passed replication source implemention throws errors, " + + "defaulting to ReplicationSource", e); + src = new ReplicationSource(); + + } + src.init(conf, fs, manager, stopper, replicating, peerClusterId); + return src; + } + + /** + * Transfer all the queues of the specified to this region server. + * First it tries to grab a lock and if it works it will move the + * znodes and finally will delete the old znodes. + * + * It creates one old source for any type of source of the old rs. + * @param rsZnode + */ + public void transferQueues(String rsZnode) { + // We try to lock that rs' queue directory + if (this.stopper.isStopped()) { + LOG.info("Not transferring queue since we are shutting down"); + return; + } + if (!this.zkHelper.lockOtherRS(rsZnode)) { + return; + } + LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); + SortedMap<String, SortedSet<String>> newQueues = + this.zkHelper.copyQueuesFromRS(rsZnode); + if (newQueues == null || newQueues.size() == 0) { + return; + } + this.zkHelper.deleteRsQueues(rsZnode); + + for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) { + String peerId = entry.getKey(); + try { + ReplicationSourceInterface src = getReplicationSource(this.conf, + this.fs, this, this.stopper, this.replicating, peerId); + this.oldsources.add(src); + for (String hlog : entry.getValue()) { + src.enqueueLog(new Path(this.oldLogDir, hlog)); + } + src.startup(); + } catch (IOException e) { + // TODO manage it + LOG.error("Failed creating a source", e); + } + } + } + + /** + * Clear the references to the specified old source + * @param src source to clear + */ + public void closeRecoveredQueue(ReplicationSourceInterface src) { + LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); + this.oldsources.remove(src); + this.zkHelper.deleteSource(src.getPeerClusterZnode()); + } + + /** + * Watcher used to be notified of the other region server's death + * in the local cluster. It initiates the process to transfer the queues + * if it is able to grab the lock. + */ + public class OtherRegionServerWatcher implements Watcher { + @Override + public void process(WatchedEvent watchedEvent) { + LOG.info(" event " + watchedEvent); + if (watchedEvent.getType().equals(Event.KeeperState.Expired) || + watchedEvent.getType().equals(Event.KeeperState.Disconnected)) { + return; + } + + List<String> newRsList = (zkHelper.getRegisteredRegionServers(this)); + if (newRsList == null) { + return; + } else { + synchronized (otherRegionServers) { + otherRegionServers.clear(); + otherRegionServers.addAll(newRsList); + } + } + if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) { + LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it"); + String[] rsZnodeParts = watchedEvent.getPath().split("/"); + transferQueues(rsZnodeParts[rsZnodeParts.length-1]); + } + } + } + + /** + * Get the directory where hlogs are archived + * @return the directory where hlogs are archived + */ + public Path getOldLogDir() { + return this.oldLogDir; + } + + /** + * Get the directory where hlogs are stored by their RSs + * @return the directory where hlogs are stored by their RSs + */ + public Path getLogDir() { + return this.logDir; + } + + /** + * Get the handle on the local file system + * @returnthe handle on the local file system + */ + public FileSystem getFs() { + return this.fs; + } } Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java Fri Aug 27 20:53:15 2010 @@ -149,8 +149,7 @@ class HMerge { Path logdir = new Path(tabledir, "merge_" + System.currentTimeMillis() + HConstants.HREGION_LOGDIR_NAME); Path oldLogDir = new Path(tabledir, HConstants.HREGION_OLDLOGDIR_NAME); - this.hlog = - new HLog(fs, logdir, oldLogDir, conf, null); + this.hlog = new HLog(fs, logdir, oldLogDir, conf); } void process() throws IOException { Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java Fri Aug 27 20:53:15 2010 @@ -103,7 +103,7 @@ public class MetaUtils { HConstants.HREGION_LOGDIR_NAME + "_" + System.currentTimeMillis()); Path oldLogDir = new Path(this.fs.getHomeDirectory(), HConstants.HREGION_OLDLOGDIR_NAME); - this.log = new HLog(this.fs, logdir, oldLogDir, this.conf, null); + this.log = new HLog(this.fs, logdir, oldLogDir, this.conf); } return this.log; } Modified: hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp (original) +++ hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp Fri Aug 27 20:53:15 2010 @@ -50,7 +50,7 @@ <table> <tr><th>Region Name</th><th>Start Key</th><th>End Key</th><th>Metrics</th></tr> <% for (HRegionInfo r: onlineRegions) { - HServerLoad.RegionLoad load = regionServer.createRegionLoad(r.getRegionName()); + HServerLoad.RegionLoad load = regionServer.createRegionLoad(r.getEncodedName()); %> <tr><td><%= r.getRegionNameAsString() %></td> <td><%= Bytes.toStringBinary(r.getStartKey()) %></td><td><%= Bytes.toStringBinary(r.getEndKey()) %></td> Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java (original) +++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java Fri Aug 27 20:53:15 2010 @@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.master; import static org.junit.Assert.assertEquals; import java.net.URLEncoder; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -31,7 +30,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper; import org.junit.After; import org.junit.AfterClass; @@ -102,7 +100,7 @@ public class TestLogsCleaner { return this.stopped; } }; - LogsCleaner cleaner = new LogsCleaner(1000, stoppable,c, fs, oldLogDir); + LogCleaner cleaner = new LogCleaner(1000, stoppable, c, fs, oldLogDir); // Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files long now = System.currentTimeMillis(); Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java (original) +++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java Fri Aug 27 20:53:15 2010 @@ -29,7 +29,7 @@ public class InstrumentedSequenceFileLog @Override public void append(HLog.Entry entry) throws IOException { super.append(entry); - if (activateFailure && Bytes.equals(entry.getKey().getRegionName(), "break".getBytes())) { + if (activateFailure && Bytes.equals(entry.getKey().getEncodedRegionName(), "break".getBytes())) { System.out.println(getClass().getName() + ": I will throw an exception now..."); throw(new IOException("This exception is instrumented and should only be thrown for testing")); } Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original) +++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Fri Aug 27 20:53:15 2010 @@ -133,7 +133,7 @@ public class TestHLog { final byte [] tableName = Bytes.toBytes(getName()); final byte [] rowName = tableName; Path logdir = new Path(dir, HConstants.HREGION_LOGDIR_NAME); - HLog log = new HLog(fs, logdir, oldLogDir, conf, null); + HLog log = new HLog(fs, logdir, oldLogDir, conf); final int howmany = 3; HRegionInfo[] infos = new HRegionInfo[3]; for(int i = 0; i < howmany; i++) { @@ -192,7 +192,7 @@ public class TestHLog { out.close(); in.close(); Path subdir = new Path(dir, "hlogdir"); - HLog wal = new HLog(fs, subdir, oldLogDir, conf, null); + HLog wal = new HLog(fs, subdir, oldLogDir, conf); final int total = 20; HRegionInfo info = new HRegionInfo(new HTableDescriptor(bytes), @@ -295,7 +295,7 @@ public class TestHLog { HLog.Entry entry = new HLog.Entry(); while((entry = reader.next(entry)) != null) { HLogKey key = entry.getKey(); - String region = Bytes.toString(key.getRegionName()); + String region = Bytes.toString(key.getEncodedRegionName()); // Assert that all edits are for same region. if (previousRegion != null) { assertEquals(previousRegion, region); @@ -325,7 +325,7 @@ public class TestHLog { HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); Path subdir = new Path(dir, "hlogdir"); Path archdir = new Path(dir, "hlogdir_archive"); - HLog wal = new HLog(fs, subdir, archdir, conf, null); + HLog wal = new HLog(fs, subdir, archdir, conf); final int total = 20; for (int i = 0; i < total; i++) { @@ -429,7 +429,7 @@ public class TestHLog { final byte [] tableName = Bytes.toBytes("tablename"); final byte [] row = Bytes.toBytes("row"); HLog.Reader reader = null; - HLog log = new HLog(fs, dir, oldLogDir, conf, null); + HLog log = new HLog(fs, dir, oldLogDir, conf); try { // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... @@ -442,10 +442,9 @@ public class TestHLog { } HRegionInfo info = new HRegionInfo(new HTableDescriptor(tableName), row,Bytes.toBytes(Bytes.toString(row) + "1"), false); - final byte [] regionName = info.getRegionName(); log.append(info, tableName, cols, System.currentTimeMillis()); long logSeqId = log.startCacheFlush(); - log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion()); + log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId, info.isMetaRegion()); log.close(); Path filename = log.computeFilename(); log = null; @@ -458,7 +457,7 @@ public class TestHLog { if (entry == null) break; HLogKey key = entry.getKey(); WALEdit val = entry.getEdit(); - assertTrue(Bytes.equals(regionName, key.getRegionName())); + assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); assertTrue(Bytes.equals(tableName, key.getTablename())); KeyValue kv = val.getKeyValues().get(0); assertTrue(Bytes.equals(row, kv.getRow())); @@ -470,7 +469,7 @@ public class TestHLog { HLogKey key = entry.getKey(); WALEdit val = entry.getEdit(); // Assert only one more row... the meta flushed row. - assertTrue(Bytes.equals(regionName, key.getRegionName())); + assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); assertTrue(Bytes.equals(tableName, key.getTablename())); KeyValue kv = val.getKeyValues().get(0); assertTrue(Bytes.equals(HLog.METAROW, kv.getRow())); @@ -498,7 +497,7 @@ public class TestHLog { final byte [] tableName = Bytes.toBytes("tablename"); final byte [] row = Bytes.toBytes("row"); Reader reader = null; - HLog log = new HLog(fs, dir, oldLogDir, conf, null); + HLog log = new HLog(fs, dir, oldLogDir, conf); try { // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... @@ -513,7 +512,7 @@ public class TestHLog { HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); log.append(hri, tableName, cols, System.currentTimeMillis()); long logSeqId = log.startCacheFlush(); - log.completeCacheFlush(hri.getRegionName(), tableName, logSeqId, false); + log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false); log.close(); Path filename = log.computeFilename(); log = null; @@ -524,7 +523,7 @@ public class TestHLog { int idx = 0; for (KeyValue val : entry.getEdit().getKeyValues()) { assertTrue(Bytes.equals(hri.getRegionName(), - entry.getKey().getRegionName())); + entry.getKey().getEncodedRegionName())); assertTrue(Bytes.equals(tableName, entry.getKey().getTablename())); assertTrue(Bytes.equals(row, val.getRow())); assertEquals((byte)(idx + '0'), val.getValue()[0]); @@ -537,7 +536,7 @@ public class TestHLog { assertEquals(1, entry.getEdit().size()); for (KeyValue val : entry.getEdit().getKeyValues()) { assertTrue(Bytes.equals(hri.getRegionName(), - entry.getKey().getRegionName())); + entry.getKey().getEncodedRegionName())); assertTrue(Bytes.equals(tableName, entry.getKey().getTablename())); assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily())); @@ -564,9 +563,9 @@ public class TestHLog { final int COL_COUNT = 10; final byte [] tableName = Bytes.toBytes("tablename"); final byte [] row = Bytes.toBytes("row"); - HLog log = new HLog(fs, dir, oldLogDir, conf, null); - DumbLogEntriesVisitor visitor = new DumbLogEntriesVisitor(); - log.addLogEntryVisitor(visitor); + HLog log = new HLog(fs, dir, oldLogDir, conf); + DumbWALObserver visitor = new DumbWALObserver(); + log.registerWALActionsListener(visitor); long timestamp = System.currentTimeMillis(); HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); @@ -578,7 +577,7 @@ public class TestHLog { log.append(hri, tableName, cols, System.currentTimeMillis()); } assertEquals(COL_COUNT, visitor.increments); - log.removeLogEntryVisitor(visitor); + log.unregisterWALActionsListener(visitor); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), @@ -587,8 +586,7 @@ public class TestHLog { assertEquals(COL_COUNT, visitor.increments); } - static class DumbLogEntriesVisitor implements LogEntryVisitor { - + static class DumbWALObserver implements WALObserver { int increments = 0; @Override @@ -596,5 +594,17 @@ public class TestHLog { WALEdit logEdit) { increments++; } + + @Override + public void logRolled(Path newFile) { + // TODO Auto-generated method stub + + } + + @Override + public void logRollRequested() { + // TODO Auto-generated method stub + + } } } Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java?rev=990266&view=auto ============================================================================== --- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java (added) +++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java Fri Aug 27 20:53:15 2010 @@ -0,0 +1,135 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * Test that the actions are called while playing with an HLog + */ +public class TestWALObserver { + protected static final Log LOG = LogFactory.getLog(TestWALObserver.class); + + private final static HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private final static byte[] SOME_BYTES = Bytes.toBytes("t"); + private static FileSystem fs; + private static Path oldLogDir; + private static Path logDir; + private static Configuration conf; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.setInt("hbase.regionserver.maxlogs", 5); + fs = FileSystem.get(conf); + oldLogDir = new Path(HBaseTestingUtility.getTestDir(), + HConstants.HREGION_OLDLOGDIR_NAME); + logDir = new Path(HBaseTestingUtility.getTestDir(), + HConstants.HREGION_LOGDIR_NAME); + } + + @Before + public void setUp() throws Exception { + fs.delete(logDir, true); + fs.delete(oldLogDir, true); + } + + @After + public void tearDown() throws Exception { + setUp(); + } + + /** + * Add a bunch of dummy data and roll the logs every two insert. We + * should end up with 10 rolled files (plus the roll called in + * the constructor). Also test adding a listener while it's running. + */ + @Test + public void testActionListener() throws Exception { + DummyWALObserver observer = new DummyWALObserver(); + List<WALObserver> list = new ArrayList<WALObserver>(); + list.add(observer); + DummyWALObserver laterobserver = new DummyWALObserver(); + HLog hlog = new HLog(fs, logDir, oldLogDir, conf, list, null); + HRegionInfo hri = new HRegionInfo(new HTableDescriptor(SOME_BYTES), + SOME_BYTES, SOME_BYTES, false); + + for (int i = 0; i < 20; i++) { + byte[] b = Bytes.toBytes(i+""); + KeyValue kv = new KeyValue(b,b,b); + WALEdit edit = new WALEdit(); + edit.add(kv); + HLogKey key = new HLogKey(b,b, 0, 0); + hlog.append(hri, key, edit); + if (i == 10) { + hlog.registerWALActionsListener(laterobserver); + } + if (i % 2 == 0) { + hlog.rollWriter(); + } + } + assertEquals(11, observer.logRollCounter); + assertEquals(5, laterobserver.logRollCounter); + } + + /** + * Just counts when methods are called + */ + static class DummyWALObserver implements WALObserver { + public int logRollCounter = 0; + + @Override + public void logRolled(Path newFile) { + logRollCounter++; + } + + @Override + public void logRollRequested() { + // Not interested + } + + @Override + public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, + WALEdit logEdit) { + // Not interested + + } + } +} \ No newline at end of file Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original) +++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Fri Aug 27 20:53:15 2010 @@ -482,10 +482,10 @@ public class TestWALReplay { * @throws IOException */ private HLog createWAL(final Configuration c) throws IOException { - HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c, null); + HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c); // Set down maximum recovery so we dfsclient doesn't linger retrying something // long gone. HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); return wal; } -} +} \ No newline at end of file Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java?rev=990266&r1=990265&r2=990266&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java (original) +++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java Fri Aug 27 20:53:15 2010 @@ -248,7 +248,7 @@ public class TestMergeTool extends HBase System.currentTimeMillis()); LOG.info("Creating log " + logPath.toString()); Path oldLogDir = new Path("/tmp", HConstants.HREGION_OLDLOGDIR_NAME); - HLog log = new HLog(this.fs, logPath, oldLogDir, this.conf, null); + HLog log = new HLog(this.fs, logPath, oldLogDir, this.conf); try { // Merge Region 0 and Region 1 HRegion merged = mergeAndVerify("merging regions 0 and 1",
