Author: stack
Date: Fri Sep 17 04:18:33 2010
New Revision: 997973
URL: http://svn.apache.org/viewvc?rev=997973&view=rev
Log:
HBASE-3006 Reading compressed HFile blocks causes way too many DFS RPC calls
severly impacting performance
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=997973&r1=997972&r2=997973&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Fri
Sep 17 04:18:33 2010
@@ -1059,7 +1059,7 @@ public class HFile {
new BufferedInputStream(
new BoundedRangeFileInputStream(this.istream, offset,
compressedSize,
pread),
- Math.min(DEFAUT_BLOCKSIZE, compressedSize)),
+ Math.min(DEFAULT_BLOCKSIZE, compressedSize)),
decompressor, 0);
buf = ByteBuffer.allocate(decompressedSize);
IOUtils.readFully(is, buf.array(), 0, buf.capacity());
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java?rev=997973&r1=997972&r2=997973&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
Fri Sep 17 04:18:33 2010
@@ -95,7 +95,7 @@ class ActiveMasterManager extends ZooKee
clusterHasActiveMaster.set(true);
} else {
// Node is no longer there, cluster does not have an active master
- LOG.debug("No master available. notifying waiting threads");
+ LOG.debug("No master available. Notifying waiting threads");
clusterHasActiveMaster.set(false);
// Notify any thread waiting to become the active master
clusterHasActiveMaster.notifyAll();
@@ -114,46 +114,56 @@ class ActiveMasterManager extends ZooKee
*
* This also makes sure that we are watching the master znode so will be
* notified if another master dies.
- * @return False if we did not start up this cluster, another
- * master did, or if a problem (zookeeper, stop flag has been set on this
- * Master)
+ * @return True if no issue becoming active master else false if another
+ * master was running or if some other problem (zookeeper, stop flag has been
+ * set on this Master)
*/
boolean blockUntilBecomingActiveMaster() {
- boolean thisMasterStartedCluster = true;
+ boolean cleanSetOfActiveMaster = true;
// Try to become the active master, watch if there is another master
try {
- if(ZKUtil.setAddressAndWatch(watcher, watcher.masterAddressZNode,
- address)) {
+ if (ZKUtil.setAddressAndWatch(this.watcher,
+ this.watcher.masterAddressZNode, this.address)) {
// We are the master, return
- clusterHasActiveMaster.set(true);
- return thisMasterStartedCluster;
+ this.clusterHasActiveMaster.set(true);
+ return cleanSetOfActiveMaster;
+ }
+
+ // There is another active master running elsewhere or this is a restart
+ // and the master ephemeral node has not expired yet.
+ this.clusterHasActiveMaster.set(true);
+ cleanSetOfActiveMaster = false;
+ HServerAddress currentMaster =
+ ZKUtil.getDataAsAddress(this.watcher, this.watcher.masterAddressZNode);
+ if (currentMaster != null && currentMaster.equals(this.address)) {
+ LOG.info("Current master has this master's address, " + currentMaster +
+ "; master was restarted? Waiting on znode to expire...");
+ // Hurry along the expiration of the znode.
+ ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode);
+ } else {
+ LOG.info("Another master is the active master, " + currentMaster +
+ "; waiting to become the next active master");
}
} catch (KeeperException ke) {
master.abort("Received an unexpected KeeperException, aborting", ke);
return false;
}
- // There is another active master, this is not a cluster startup
- // and we must wait until the active master dies
- LOG.info("Another master is already the active master, waiting to become "
+
- "the next active master");
- clusterHasActiveMaster.set(true);
- thisMasterStartedCluster = false;
- synchronized(clusterHasActiveMaster) {
- while(clusterHasActiveMaster.get() && !master.isStopped()) {
+ synchronized (this.clusterHasActiveMaster) {
+ while (this.clusterHasActiveMaster.get() && !this.master.isStopped()) {
try {
- clusterHasActiveMaster.wait();
+ this.clusterHasActiveMaster.wait();
} catch (InterruptedException e) {
// We expect to be interrupted when a master dies, will fall out if
so
LOG.debug("Interrupted waiting for master to die", e);
}
}
- if(master.isStopped()) {
- return thisMasterStartedCluster;
+ if (this.master.isStopped()) {
+ return cleanSetOfActiveMaster;
}
// Try to become active master again now that there is no active master
blockUntilBecomingActiveMaster();
}
- return thisMasterStartedCluster;
+ return cleanSetOfActiveMaster;
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=997973&r1=997972&r2=997973&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Fri
Sep 17 04:18:33 2010
@@ -145,12 +145,14 @@ implements HMasterInterface, HMasterRegi
// Cluster status zk tracker and local setter
private ClusterStatusTracker clusterStatusTracker;
- // True if this is the master that started the cluster.
- boolean clusterStarter;
-
- // This flag is for stopping this Master instance.
- private boolean stopped = false;
- // Set on abort -- usually failure of our zk session
+ // True if this a cluster startup as opposed to a master joining an already
+ // running cluster
+ boolean freshClusterStart;
+
+ // This flag is for stopping this Master instance. Its set when we are
+ // stopping or aborting
+ private volatile boolean stopped = false;
+ // Set on abort -- usually failure of our zk session.
private volatile boolean abort = false;
// Instance of the hbase executor service.
@@ -183,12 +185,12 @@ implements HMasterInterface, HMasterRegi
HServerAddress a = new HServerAddress(getMyAddress(this.conf));
int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
this.rpcServer = HBaseRPC.getServer(this,
- new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
- a.getBindAddress(), a.getPort(),
- numHandlers,
- 0, // we dont use high priority handlers in master
- false, conf,
- 0); // this is a DNC w/o high priority handlers
+ new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
+ a.getBindAddress(), a.getPort(),
+ numHandlers,
+ 0, // we dont use high priority handlers in master
+ false, conf,
+ 0); // this is a DNC w/o high priority handlers
this.address = new HServerAddress(rpcServer.getListenerAddress());
// set the thread name now we have an address
@@ -214,8 +216,9 @@ implements HMasterInterface, HMasterRegi
this.zooKeeper =
new ZooKeeperWatcher(conf, MASTER + "-" + getMasterAddress(), this);
- this.clusterStarter = 0 ==
- ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode);
+ // Are there regionservers running already?
+ boolean regionservers =
+ 0 == ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode);
/*
* 3. Block on becoming the active master.
@@ -229,26 +232,10 @@ implements HMasterInterface, HMasterRegi
this.activeMasterManager = new ActiveMasterManager(zooKeeper, address,
this);
this.zooKeeper.registerListener(activeMasterManager);
-
- // If we're a backup master, stall until a primary to writes his address
- if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
- HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
- // This will only be a minute or so while the cluster starts up,
- // so don't worry about setting watches on the parent znode
- while (!this.activeMasterManager.isActiveMaster()) {
- try {
- LOG.debug("Waiting for master address ZNode to be written " +
- "(Also watching cluster state node)");
- Thread.sleep(conf.getInt("zookeeper.session.timeout", 60 * 1000));
- } catch (InterruptedException e) {
- // interrupted = user wants to kill us. Don't continue
- throw new IOException("Interrupted waiting for master address");
- }
- }
- }
+ stallIfBackupMaster(this.conf, this.activeMasterManager);
// Wait here until we are the active master
- clusterStarter = activeMasterManager.blockUntilBecomingActiveMaster();
+ activeMasterManager.blockUntilBecomingActiveMaster();
/**
* 4. We are active master now... go initialize components we need to run.
@@ -272,17 +259,42 @@ implements HMasterInterface, HMasterRegi
this.serverManager);
regionServerTracker.start();
- // Set the cluster as up.
+ // Set the cluster as up. If new RSs, they'll be waiting on this before
+ // going ahead with their startup.
this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
+
+ this.freshClusterStart =
+ !this.clusterStatusTracker.isClusterUp() && !regionservers;
this.clusterStatusTracker.setClusterUp();
this.clusterStatusTracker.start();
LOG.info("Server active/primary master; " + this.address +
- "; clusterStarter=" + this.clusterStarter + ", sessionid=0x" +
+ "; freshClusterStart=" + this.freshClusterStart + ", sessionid=0x" +
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
}
/**
+ * Stall startup if we are designated a backup master.
+ * @param c
+ * @param amm
+ * @throws InterruptedException
+ */
+ private static void stallIfBackupMaster(final Configuration c,
+ final ActiveMasterManager amm)
+ throws InterruptedException {
+ // If we're a backup master, stall until a primary to writes his address
+ if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
+ HConstants.DEFAULT_MASTER_TYPE_BACKUP)) return;
+ // This will only be a minute or so while the cluster starts up,
+ // so don't worry about setting watches on the parent znode
+ while (!amm.isActiveMaster()) {
+ LOG.debug("Waiting for master address ZNode to be written " +
+ "(Also watching cluster state node)");
+ Thread.sleep(c.getInt("zookeeper.session.timeout", 60 * 1000));
+ }
+ }
+
+ /**
* Main processing loop for the HMaster.
* 1. Handle both fresh cluster start as well as failed over initialization
of
* the HMaster.
@@ -297,20 +309,22 @@ implements HMasterInterface, HMasterRegi
startServiceThreads();
// wait for minimum number of region servers to be up
this.serverManager.waitForMinServers();
- // start assignment of user regions, startup or failure
- if (this.clusterStarter) {
- clusterStarterInitializations(this.fileSystemManager,
+
+ // Start assignment of user regions, startup or failure
+ if (!this.stopped) {
+ if (this.freshClusterStart) {
+ clusterStarterInitializations(this.fileSystemManager,
this.serverManager, this.catalogTracker, this.assignmentManager);
- } else {
- // Process existing unassigned nodes in ZK, read all regions from META,
- // rebuild in-memory state.
- this.assignmentManager.processFailover();
+ } else {
+ // Process existing unassigned nodes in ZK, read all regions from
META,
+ // rebuild in-memory state.
+ this.assignmentManager.processFailover();
+ }
}
+
// Check if we should stop every second.
Sleeper sleeper = new Sleeper(1000, this);
- while (!this.stopped && !this.abort) {
- sleeper.sleep();
- }
+ while (!this.stopped) sleeper.sleep();
} catch (Throwable t) {
abort("Unhandled exception. Starting shutdown.", t);
}
@@ -795,6 +809,7 @@ implements HMasterInterface, HMasterRegi
if (t != null) LOG.fatal(msg, t);
else LOG.fatal(msg);
this.abort = true;
+ stop("Aborting");
}
@Override
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=997973&r1=997972&r2=997973&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
Fri Sep 17 04:18:33 2010
@@ -127,6 +127,7 @@ public class ServerManager {
this.services = services;
Configuration c = master.getConfiguration();
int monitorInterval = c.getInt("hbase.master.monitor.interval", 60 * 1000);
+ // TODO: Fix.
this.minimumServerCount = c.getInt("hbase.regions.server.count.min", 1);
this.metrics = new MasterMetrics(master.getServerName());
this.serverMonitorThread = new ServerMonitor(monitorInterval, master);
@@ -220,8 +221,8 @@ public class ServerManager {
info.setLoad(load);
// TODO: Why did we update the RS location ourself? Shouldn't RS do this?
// masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
- onlineServers.put(serverName, info);
- if(hri == null) {
+ this.onlineServers.put(serverName, info);
+ if (hri == null) {
serverConnections.remove(serverName);
} else {
serverConnections.put(serverName, hri);
@@ -549,10 +550,9 @@ public class ServerManager {
* Waits for the minimum number of servers to be running.
*/
public void waitForMinServers() {
- while(numServers() < minimumServerCount) {
-// !masterStatus.getShutdownRequested().get()) {
+ while (numServers() < minimumServerCount && !this.master.isStopped()) {
LOG.info("Waiting for enough servers to check in. Currently have " +
- numServers() + " but need at least " + minimumServerCount);
+ numServers() + " but need at least " + minimumServerCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=997973&r1=997972&r2=997973&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Fri
Sep 17 04:18:33 2010
@@ -554,7 +554,7 @@ public class ZKUtil {
String znode)
throws KeeperException {
byte [] data = getDataAndWatch(zkw, znode);
- if(data == null) {
+ if (data == null) {
return null;
}
String addrString = Bytes.toString(data);
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java?rev=997973&r1=997972&r2=997973&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
Fri Sep 17 04:18:33 2010
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFal
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
@@ -57,6 +58,39 @@ public class TestActiveMasterManager {
TEST_UTIL.shutdownMiniZKCluster();
}
+ @Test public void testRestartMaster() throws IOException, KeeperException {
+ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
+ "testActiveMasterManagerFromZK", null);
+ ZKUtil.createAndFailSilent(zk, zk.baseZNode);
+ try {
+ ZKUtil.deleteNode(zk, zk.masterAddressZNode);
+ } catch(KeeperException.NoNodeException nne) {}
+
+ // Create the master node with a dummy address
+ HServerAddress master = new HServerAddress("localhost", 1);
+ // Should not have a master yet
+ DummyMaster dummyMaster = new DummyMaster();
+ ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk,
+ master, dummyMaster);
+ zk.registerListener(activeMasterManager);
+ assertFalse(activeMasterManager.clusterHasActiveMaster.get());
+
+ // First test becoming the active master uninterrupted
+ activeMasterManager.blockUntilBecomingActiveMaster();
+ assertTrue(activeMasterManager.clusterHasActiveMaster.get());
+ assertMaster(zk, master);
+
+ // Now pretend master restart
+ DummyMaster secondDummyMaster = new DummyMaster();
+ ActiveMasterManager secondActiveMasterManager = new ActiveMasterManager(zk,
+ master, secondDummyMaster);
+ zk.registerListener(secondActiveMasterManager);
+ assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
+ activeMasterManager.blockUntilBecomingActiveMaster();
+ assertTrue(activeMasterManager.clusterHasActiveMaster.get());
+ assertMaster(zk, master);
+ }
+
/**
* Unit tests that uses ZooKeeper but does not use the master-side methods
* but rather acts directly on ZK.
@@ -64,22 +98,21 @@ public class TestActiveMasterManager {
*/
@Test
public void testActiveMasterManagerFromZK() throws Exception {
-
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
- "testActiveMasterManagerFromZK", null);
+ "testActiveMasterManagerFromZK", null);
ZKUtil.createAndFailSilent(zk, zk.baseZNode);
try {
ZKUtil.deleteNode(zk, zk.masterAddressZNode);
} catch(KeeperException.NoNodeException nne) {}
// Create the master node with a dummy address
- HServerAddress firstMasterAddress = new HServerAddress("firstMaster",
1234);
- HServerAddress secondMasterAddress = new HServerAddress("secondMaster",
1234);
+ HServerAddress firstMasterAddress = new HServerAddress("localhost", 1);
+ HServerAddress secondMasterAddress = new HServerAddress("localhost", 2);
// Should not have a master yet
DummyMaster ms1 = new DummyMaster();
ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk,
- firstMasterAddress, ms1);
+ firstMasterAddress, ms1);
zk.registerListener(activeMasterManager);
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
@@ -132,6 +165,9 @@ public class TestActiveMasterManager {
assertTrue(t.manager.clusterHasActiveMaster.get());
assertTrue(t.isActiveMaster);
+
+ LOG.info("Deleting master node");
+ ZKUtil.deleteNode(zk, zk.masterAddressZNode);
}
/**