Author: stack
Date: Thu Sep 30 23:12:56 2010
New Revision: 1003330
URL: http://svn.apache.org/viewvc?rev=1003330&view=rev
Log:
HBASE-3019 Make bulk assignment on cluster startup run faster
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1003330&r1=1003329&r2=1003330&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Sep 30 23:12:56 2010
@@ -958,6 +958,7 @@ Release 0.21.0 - Unreleased
(dhruba borthakur via Stack)
HBASE-2646 Compaction requests should be prioritized to prevent blocking
(Jeff Whiting via Stack)
+ HBASE-3019 Make bulk assignment on cluster startup run faster
NEW FEATURES
HBASE-1961 HBase EC2 scripts
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1003330&r1=1003329&r2=1003330&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
Thu Sep 30 23:12:56 2010
@@ -303,6 +303,12 @@ public interface HRegionInterface extend
public void openRegion(final HRegionInfo region);
/**
+ * Opens the specified regions.
+ * @param regions regions to open
+ */
+ public void openRegions(final List<HRegionInfo> regions);
+
+ /**
* Closes the specified region.
* @param region region to close
* @return true if closing region, false if not
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1003330&r1=1003329&r2=1003330&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
Thu Sep 30 23:12:56 2010
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.master;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -33,6 +34,7 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,6 +67,8 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.io.Writable;
import org.apache.zookeeper.KeeperException;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* Manages and performs region assignment.
* <p>
@@ -477,7 +481,8 @@ public class AssignmentManager extends Z
this.regionsInTransition.remove(regionInfo.getEncodedName());
if (rs != null) {
this.regionsInTransition.notifyAll();
- LOG.warn("Asked online a region that was already in " +
+ } else {
+ LOG.warn("Asked online a region that was not in " +
"regionsInTransition: " + rs);
}
}
@@ -550,33 +555,113 @@ public class AssignmentManager extends Z
* @param regionName server to be assigned
*/
public void assign(HRegionInfo region) {
- // Grab the state of this region and synchronize on it
- String encodedName = region.getEncodedName();
- RegionState state;
- synchronized (regionsInTransition) {
- state = regionsInTransition.get(encodedName);
- if (state == null) {
- state = new RegionState(region, RegionState.State.OFFLINE);
- regionsInTransition.put(encodedName, state);
- }
- }
- // This here gap between synchronizations looks like a hole but it should
- // be ok because the assign below would protect against being called with
- // a state instance that is not in the right 'state' -- St.Ack 20100920.
+ RegionState state = addToRegionsInTransition(region);
synchronized (state) {
assign(state);
}
}
/**
+ * Bulk assign regions to <code>destination</code>. If we fail in any way,
+ * we'll abort the server.
+ * @param destination
+ * @param regions Regions to assign.
+ */
+ public void assign(final HServerInfo destination,
+ final List<HRegionInfo> regions) {
+ LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
+ destination.getServerName());
+ List<RegionState> states = new ArrayList<RegionState>(regions.size());
+ synchronized (this.regionsInTransition) {
+ for (HRegionInfo region: regions) {
+ states.add(forceRegionStateToOffline(region));
+ }
+ }
+ // Presumption is that only this thread will be updating the state at this
+ // time; i.e. handlers on backend won't be trying to set it to OPEN, etc.
+ for (RegionState state: states) {
+ if (!setOfflineInZooKeeper(state)) {
+ return;
+ }
+ }
+ for (RegionState state: states) {
+ // Transition RegionState to PENDING_OPEN here in master; means we've
+ // sent the open. We're a little ahead of ourselves here since we've not
+ // yet sent out the actual open but putting this state change after the
+ // call to open risks our writing PENDING_OPEN after state has been moved
+ // to OPENING by the regionserver.
+ state.update(RegionState.State.PENDING_OPEN);
+ }
+ try {
+ // Send OPEN RPC. This can fail if the server on other end is is not up.
+ this.serverManager.sendRegionOpen(destination, regions);
+ } catch (Throwable t) {
+ this.master.abort("Failed assignment of regions to " + destination, t);
+ return;
+ }
+ LOG.debug("Bulk assigning done for " + destination.getServerName());
+ }
+
+ private RegionState addToRegionsInTransition(final HRegionInfo region) {
+ synchronized (regionsInTransition) {
+ return forceRegionStateToOffline(region);
+ }
+ }
+
+ /**
+ * Sets regions {...@link RegionState} to {...@link
RegionState.State#OFFLINE}.
+ * Caller must hold lock on this.regionsInTransition.
+ * @param region
+ * @return Amended RegionState.
+ */
+ private RegionState forceRegionStateToOffline(final HRegionInfo region) {
+ String encodedName = region.getEncodedName();
+ RegionState state = this.regionsInTransition.get(encodedName);
+ if (state == null) {
+ state = new RegionState(region, RegionState.State.OFFLINE);
+ this.regionsInTransition.put(encodedName, state);
+ }
+ return state;
+ }
+
+ /**
* Caller must hold lock on the passed <code>state</code> object.
* @param state
*/
private void assign(final RegionState state) {
+ if (!setOfflineInZooKeeper(state)) return;
+ RegionPlan plan = getRegionPlan(state);
+ try {
+ LOG.debug("Assigning region " +
state.getRegion().getRegionNameAsString() +
+ " to " + plan.getDestination().getServerName());
+ // Send OPEN RPC. This can fail if the server on other end is is not up.
+ serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());
+ // Transition RegionState to PENDING_OPEN
+ state.update(RegionState.State.PENDING_OPEN);
+ } catch (Throwable t) {
+ LOG.warn("Failed assignment of " +
+ state.getRegion().getRegionNameAsString() + " to " +
+ plan.getDestination(), t);
+ // Clean out plan we failed execute and one that doesn't look like it'll
+ // succeed anyways; we need a new plan!
+ synchronized(regionPlans) {
+ this.regionPlans.remove(state.getRegion().getEncodedName());
+ }
+ }
+ }
+
+ /**
+ * Set region as OFFLINED up in zookeeper
+ * @param state
+ * @return True if we succeeded, false otherwise (State was incorrect or
failed
+ * updating zk).
+ */
+ boolean setOfflineInZooKeeper(final RegionState state) {
if (!state.isClosed() && !state.isOffline()) {
- LOG.info("Attempting to assign region but it is in transition and in " +
- "an unexpected state:" + state);
- return;
+ new RuntimeException("Unexpected state trying to OFFLINE; " + state);
+ this.master.abort("Unexpected state trying to OFFLINE; " + state,
+ new IllegalStateException());
+ return false;
} else {
state.update(RegionState.State.OFFLINE);
}
@@ -584,23 +669,27 @@ public class AssignmentManager extends Z
if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
state.getRegion(), master.getServerName())) {
LOG.warn("Attempted to create/force node into OFFLINE state before " +
- "completing assignment but failed to do so");
- return;
+ "completing assignment but failed to do so for " + state);
+ return false;
}
} catch (KeeperException e) {
master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
- return;
+ return false;
}
+ return true;
+ }
+
+ RegionPlan getRegionPlan(final RegionState state) {
// Pickup existing plan or make a new one
String encodedName = state.getRegion().getEncodedName();
RegionPlan plan;
- synchronized(regionPlans) {
+ synchronized (regionPlans) {
plan = regionPlans.get(encodedName);
if (plan == null) {
LOG.debug("No previous transition plan for " +
- state.getRegion().getRegionNameAsString() +
- " so generating a random one; " +
serverManager.countOfRegionServers() +
- " (online=" + serverManager.getOnlineServers().size() + ")
available servers");
+ state.getRegion().getRegionNameAsString() +
+ " so generating a random one; " +
serverManager.countOfRegionServers() +
+ " (online=" + serverManager.getOnlineServers().size() + ") available
servers");
plan = new RegionPlan(state.getRegion(), null,
LoadBalancer.randomAssignment(serverManager.getOnlineServersList()));
regionPlans.put(encodedName, plan);
@@ -608,24 +697,7 @@ public class AssignmentManager extends Z
LOG.debug("Using preexisting plan=" + plan);
}
}
- try {
- LOG.debug("Assigning region " +
- state.getRegion().getRegionNameAsString() + " to " +
- plan.getDestination().getServerName());
- // Send OPEN RPC. This can fail if the server on other end is is not up.
- serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());
- // Transition RegionState to PENDING_OPEN
- state.update(RegionState.State.PENDING_OPEN);
- } catch (Throwable t) {
- LOG.warn("Failed assignment of " +
- state.getRegion().getRegionNameAsString() + " to " +
- plan.getDestination(), t);
- // Clean out plan we failed execute and one that doesn't look like it'll
- // succeed anyways; we need a new plan!
- synchronized(regionPlans) {
- this.regionPlans.remove(encodedName);
- }
- }
+ return plan;
}
/**
@@ -736,66 +808,71 @@ public class AssignmentManager extends Z
// Get all available servers
List<HServerInfo> servers = serverManager.getOnlineServersList();
- LOG.info("Assigning " + allRegions.size() + " region(s) across " +
+ LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
servers.size() + " server(s)");
// Generate a cluster startup region placement plan
Map<HServerInfo, List<HRegionInfo>> bulkPlan =
LoadBalancer.bulkAssignment(allRegions, servers);
- // Now start a thread per server to run assignment.
- for (Map.Entry<HServerInfo,List<HRegionInfo>> entry: bulkPlan.entrySet()) {
- Thread t = new BulkAssignServer(entry.getKey(), entry.getValue(),
this.master);
- t.start();
- }
-
- // Wait for no regions to be in transition
+ // Make a fixed thread count pool to run bulk assignments. Thought is that
+ // if a 1k cluster, running 1k bulk concurrent assignment threads will kill
+ // master, HDFS or ZK?
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setDaemon(true);
+ builder.setNameFormat(this.master.getServerName() + "-BulkAssigner-%1$d");
+ builder.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ // Abort if exception of any kind.
+ master.abort("Uncaught exception bulk assigning in " + t.getName(), e);
+ }
+ });
+ int threadCount =
+
this.master.getConfiguration().getInt("hbase.bulk.assignment.threadpool.size",
20);
+ java.util.concurrent.ExecutorService pool =
+ Executors.newFixedThreadPool(threadCount, builder.build());
+ // Disable timing out regions in transition up in zk while bulk assigning.
+ this.timeoutMonitor.bulkAssign(true);
try {
- waitUntilNoRegionsInTransition();
- } catch (InterruptedException e) {
- LOG.error("Interrupted waiting for regions to be assigned", e);
- throw new IOException(e);
+ for (Map.Entry<HServerInfo, List<HRegionInfo>> e: bulkPlan.entrySet()) {
+ pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue()));
+ }
+ // Wait for no regions to be in transition
+ try {
+ // How long to wait on empty regions-in-transition. When we timeout,
+ // we'll put back in place the monitor of R-I-T. It should do fixup
+ // if server crashed during bulk assign, etc.
+ long timeout =
+
this.master.getConfiguration().getInt("hbase.bulk.assignment.waiton.empty.rit",
10 * 60 * 1000);
+ waitUntilNoRegionsInTransition(timeout);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting for regions to be assigned", e);
+ throw new IOException(e);
+ }
+ } finally {
+ // We're done with the pool. It'll exit when its done all in queue.
+ pool.shutdown();
+ // Reenable timing out regions in transition up in zi.
+ this.timeoutMonitor.bulkAssign(false);
}
-
- LOG.info("All user regions have been assigned");
+ LOG.info("Bulk assigning done");
}
/**
- * Class to run bulk assign to a single server.
+ * Manage bulk assigning to a server.
*/
- class BulkAssignServer extends Thread {
+ class SingleServerBulkAssigner implements Runnable {
+ private final HServerInfo regionserver;
private final List<HRegionInfo> regions;
- private final HServerInfo server;
- private final Stoppable stopper;
-
- BulkAssignServer(final HServerInfo server,
- final List<HRegionInfo> regions, final Stoppable stopper) {
- super("serverassign-" + server.getServerName());
- setDaemon(true);
- this.server = server;
+ SingleServerBulkAssigner(final HServerInfo regionserver,
+ final List<HRegionInfo> regions) {
+ this.regionserver = regionserver;
this.regions = regions;
- this.stopper = stopper;
}
-
@Override
public void run() {
- // Insert a plan for each region with 'server' as the target
regionserver.
- // Below, we run through regions one at a time. The call to assign will
- // move the region into the regionsInTransition which starts up a timer.
- // if the region is not out of the regionsInTransition by a certain time,
- // it will be reassigned. We don't want that to happen. So, do it this
- // way a region at a time for now. Presumably the regionserver will put
- // up a back pressure if opening a region takes time which is good since
- // this will block our adding new regions to regionsInTransition. Later
- // make it so we can send over a lump of regions in one rpc with the
- // regionserver on remote side tickling zk on a period to prevent our
- // regionsInTransition timing out. Currently its not possible given the
- // Executor architecture on the regionserver side. St.Ack 20100920.
- for (HRegionInfo region : regions) {
- regionPlans.put(region.getEncodedName(), new RegionPlan(region, null,
server));
- assign(region);
- if (this.stopper.isStopped()) break;
- }
+ assign(this.regionserver, this.regions);
}
}
@@ -835,12 +912,18 @@ public class AssignmentManager extends Z
* that if it returns without an exception that there was a period of time
* with no regions in transition from the point-of-view of the in-memory
* state of the Master.
+ * @param timeout How long to wait on empty regions-in-transition.
* @throws InterruptedException
*/
- public void waitUntilNoRegionsInTransition() throws InterruptedException {
- synchronized(regionsInTransition) {
- while(regionsInTransition.size() > 0) {
- regionsInTransition.wait();
+ public void waitUntilNoRegionsInTransition(final long timeout)
+ throws InterruptedException {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+ synchronized (this.regionsInTransition) {
+ while(this.regionsInTransition.size() > 0 &&
+ !this.master.isStopped() && remaining > 0) {
+ this.regionsInTransition.wait(remaining);
+ remaining = timeout - (System.currentTimeMillis() - startTime);
}
}
}
@@ -849,14 +932,18 @@ public class AssignmentManager extends Z
* @return A copy of the Map of regions currently in transition.
*/
public NavigableMap<String, RegionState> getRegionsInTransition() {
- return new TreeMap<String, RegionState>(this.regionsInTransition);
+ synchronized (this.regionsInTransition) {
+ return new TreeMap<String, RegionState>(this.regionsInTransition);
+ }
}
/**
* @return True if regions in transition.
*/
public boolean isRegionsInTransition() {
- return !this.regionsInTransition.isEmpty();
+ synchronized (this.regionsInTransition) {
+ return !this.regionsInTransition.isEmpty();
+ }
}
/**
@@ -956,11 +1043,11 @@ public class AssignmentManager extends Z
}
/**
- * Unsets the specified table as disabled (enables it).
+ * Monitor to check for time outs on region transition operations
*/
public class TimeoutMonitor extends Chore {
-
private final int timeout;
+ private boolean bulkAssign = false;
/**
* Creates a periodic monitor to check for time outs on region transition
@@ -977,8 +1064,21 @@ public class AssignmentManager extends Z
this.timeout = timeout;
}
+ /**
+ * @param bulkAssign If true, we'll suspend checking regions in transition
+ * up in zookeeper. If false, will reenable check.
+ * @return Old setting for bulkAssign.
+ */
+ public boolean bulkAssign(final boolean bulkAssign) {
+ boolean result = this.bulkAssign;
+ this.bulkAssign = bulkAssign;
+ return result;
+ }
+
@Override
protected void chore() {
+ // If bulkAssign in progress, suspend checks
+ if (this.bulkAssign) return;
synchronized (regionsInTransition) {
// Iterate all regions in transition checking for time outs
long now = System.currentTimeMillis();
@@ -1140,6 +1240,9 @@ public class AssignmentManager extends Z
unassign(plan.getRegionInfo());
}
+ /**
+ * State of a Region while undergoing transitions.
+ */
public static class RegionState implements Writable {
private HRegionInfo region;
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1003330&r1=1003329&r2=1003330&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
Thu Sep 30 23:12:56 2010
@@ -494,7 +494,7 @@ public class ServerManager {
* Open should not fail but can if server just crashed.
* <p>
* @param server server to open a region
- * @param regionName region to open
+ * @param region region to open
*/
public void sendRegionOpen(HServerInfo server, HRegionInfo region) {
HRegionInterface hri = getServerConnection(server);
@@ -507,6 +507,24 @@ public class ServerManager {
}
/**
+ * Sends an OPEN RPC to the specified server to open the specified region.
+ * <p>
+ * Open should not fail but can if server just crashed.
+ * <p>
+ * @param server server to open a region
+ * @param regions regions to open
+ */
+ public void sendRegionOpen(HServerInfo server, List<HRegionInfo> regions) {
+ HRegionInterface hri = getServerConnection(server);
+ if (hri == null) {
+ LOG.warn("Attempting to send OPEN RPC to server " +
server.getServerName()
+ + " failed because no RPC connection found to this server");
+ return;
+ }
+ hri.openRegions(regions);
+ }
+
+ /**
* Sends an CLOSE RPC to the specified server to close the specified region.
* <p>
* A region server could reject the close request because it either does not
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java?rev=1003330&r1=1003329&r2=1003330&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
Thu Sep 30 23:12:56 2010
@@ -85,7 +85,8 @@ public class OpenedRegionHandler extends
@Override
public void process() {
- LOG.debug("Handling OPENED event; deleting unassigned node");
+ LOG.debug("Handling OPENED event for " + this.regionInfo.getEncodedName() +
+ "; deleting unassigned node");
// TODO: should we check if this table was disabled and get it closed?
// Remove region from in-memory transition and unassigned node from ZK
try {
@@ -94,7 +95,7 @@ public class OpenedRegionHandler extends
} catch (KeeperException e) {
server.abort("Error deleting OPENED node in ZK", e);
}
- assignmentManager.regionOnline(regionInfo, serverInfo);
+ this.assignmentManager.regionOnline(regionInfo, serverInfo);
LOG.debug("Opened region " + regionInfo.getRegionNameAsString());
}
}
\ No newline at end of file
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1003330&r1=1003329&r2=1003330&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Thu Sep 30 23:12:56 2010
@@ -1927,6 +1927,12 @@ public class HRegionServer implements HR
}
@Override
+ public void openRegions(List<HRegionInfo> regions) {
+ LOG.info("Received request to open " + regions.size() + " region(s)");
+ for (HRegionInfo region: regions) openRegion(region);
+ }
+
+ @Override
public boolean closeRegion(HRegionInfo region)
throws NotServingRegionException {
LOG.info("Received close region: " + region.getRegionNameAsString());
@@ -2453,5 +2459,4 @@ public class HRegionServer implements HR
new HRegionServerCommandLine(regionServerClass).doMain(args);
}
-
-}
+}
\ No newline at end of file
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java?rev=1003330&r1=1003329&r2=1003330&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
Thu Sep 30 23:12:56 2010
@@ -35,6 +35,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
public class TestLogsCleaner {
@@ -79,7 +80,7 @@ public class TestLogsCleaner {
public void tearDown() throws Exception {
}
- /* REENALBE -- DISABLED UNTIL REPLICATION BROUGHT UP TO NEW MASTER @Test*/
+ @Ignore @Test /* REENABLE -- DISABLED UNTIL REPLICATION BROUGHT UP TO NEW
MASTER */
public void testLogCleaning() throws Exception{
Configuration c = TEST_UTIL.getConfiguration();
Path oldLogDir = new Path(HBaseTestingUtility.getTestDir(),