Author: todd
Date: Sat Apr 7 22:17:17 2012
New Revision: 1310888
URL: http://svn.apache.org/viewvc?rev=1310888&view=rev
Log:
HADOOP-8260. Replace ClientBaseWithFixes with our own modified copy of the
class. Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
Modified:
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt?rev=1310888&r1=1310887&r2=1310888&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
(original)
+++
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
Sat Apr 7 22:17:17 2012
@@ -13,3 +13,5 @@ HADOOP-8215. Security support for ZK Fai
HADOOP-8245. Fix flakiness in TestZKFailoverController (todd)
HADOOP-8257. TestZKFailoverControllerStress occasionally fails with Mockito
error (todd)
+
+HADOOP-8260. Replace ClientBaseWithFixes with our own modified copy of the
class (todd)
Modified:
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java?rev=1310888&r1=1310887&r2=1310888&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
(original)
+++
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
Sat Apr 7 22:17:17 2012
@@ -15,50 +15,438 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.ha;
+import java.io.BufferedReader;
import java.io.File;
-import java.util.Set;
-
-import javax.management.ObjectName;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
-import org.apache.zookeeper.test.ClientBase;
-import org.apache.zookeeper.test.JMXEnv;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactoryAccessor;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * A subclass of ZK's ClientBase testing utility, with some fixes
- * necessary for running in the Hadoop context.
+ * Copy-paste of ClientBase from ZooKeeper, but without any of the
+ * JMXEnv verification. There seems to be a bug ZOOKEEPER-1438
+ * which causes spurious failures in the JMXEnv verification when
+ * we run these tests with the upstream ClientBase.
*/
-public class ClientBaseWithFixes extends ClientBase {
+public abstract class ClientBaseWithFixes extends ZKTestCase {
+ protected static final Logger LOG =
LoggerFactory.getLogger(ClientBaseWithFixes.class);
+
+ public static int CONNECTION_TIMEOUT = 30000;
+ static final File BASETEST =
+ new File(System.getProperty("build.test.dir", "build"));
+
+ protected String hostPort = "127.0.0.1:" + PortAssignment.unique();
+ protected int maxCnxns = 0;
+ protected ServerCnxnFactory serverFactory = null;
+ protected File tmpDir = null;
+
+ long initialFdCount;
+
+ public ClientBaseWithFixes() {
+ super();
+ }
+
+ /**
+ * In general don't use this. Only use in the special case that you
+ * want to ignore results (for whatever reason) in your test. Don't
+ * use empty watchers in real code!
+ *
+ */
+ protected class NullWatcher implements Watcher {
+ public void process(WatchedEvent event) { /* nada */ }
+ }
+
+ protected static class CountdownWatcher implements Watcher {
+ // XXX this doesn't need to be volatile! (Should probably be final)
+ volatile CountDownLatch clientConnected;
+ volatile boolean connected;
+
+ public CountdownWatcher() {
+ reset();
+ }
+ synchronized public void reset() {
+ clientConnected = new CountDownLatch(1);
+ connected = false;
+ }
+ synchronized public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.SyncConnected ||
+ event.getState() == KeeperState.ConnectedReadOnly) {
+ connected = true;
+ notifyAll();
+ clientConnected.countDown();
+ } else {
+ connected = false;
+ notifyAll();
+ }
+ }
+ synchronized boolean isConnected() {
+ return connected;
+ }
+ synchronized void waitForConnected(long timeout) throws
InterruptedException, TimeoutException {
+ long expire = System.currentTimeMillis() + timeout;
+ long left = timeout;
+ while(!connected && left > 0) {
+ wait(left);
+ left = expire - System.currentTimeMillis();
+ }
+ if (!connected) {
+ throw new TimeoutException("Did not connect");
+
+ }
+ }
+ synchronized void waitForDisconnected(long timeout) throws
InterruptedException, TimeoutException {
+ long expire = System.currentTimeMillis() + timeout;
+ long left = timeout;
+ while(connected && left > 0) {
+ wait(left);
+ left = expire - System.currentTimeMillis();
+ }
+ if (connected) {
+ throw new TimeoutException("Did not disconnect");
+
+ }
+ }
+ }
+
+ protected TestableZooKeeper createClient()
+ throws IOException, InterruptedException
+ {
+ return createClient(hostPort);
+ }
+
+ protected TestableZooKeeper createClient(String hp)
+ throws IOException, InterruptedException
+ {
+ CountdownWatcher watcher = new CountdownWatcher();
+ return createClient(watcher, hp);
+ }
+
+ private LinkedList<ZooKeeper> allClients;
+ private boolean allClientsSetup = false;
+
+ protected TestableZooKeeper createClient(CountdownWatcher watcher, String
hp)
+ throws IOException, InterruptedException
+ {
+ return createClient(watcher, hp, CONNECTION_TIMEOUT);
+ }
+
+ protected TestableZooKeeper createClient(CountdownWatcher watcher,
+ String hp, int timeout)
+ throws IOException, InterruptedException
+ {
+ watcher.reset();
+ TestableZooKeeper zk = new TestableZooKeeper(hp, timeout, watcher);
+ if (!watcher.clientConnected.await(timeout, TimeUnit.MILLISECONDS))
+ {
+ Assert.fail("Unable to connect to server");
+ }
+ synchronized(this) {
+ if (!allClientsSetup) {
+ LOG.error("allClients never setup");
+ Assert.fail("allClients never setup");
+ }
+ if (allClients != null) {
+ allClients.add(zk);
+ } else {
+ // test done - close the zk, not needed
+ zk.close();
+ }
+ }
+
+
+ return zk;
+ }
+
+ public static class HostPort {
+ String host;
+ int port;
+ public HostPort(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+ }
+ public static List<HostPort> parseHostPortList(String hplist) {
+ ArrayList<HostPort> alist = new ArrayList<HostPort>();
+ for (String hp: hplist.split(",")) {
+ int idx = hp.lastIndexOf(':');
+ String host = hp.substring(0, idx);
+ int port;
+ try {
+ port = Integer.parseInt(hp.substring(idx + 1));
+ } catch(RuntimeException e) {
+ throw new RuntimeException("Problem parsing " + hp +
e.toString());
+ }
+ alist.add(new HostPort(host,port));
+ }
+ return alist;
+ }
+
+ /**
+ * Send the 4letterword
+ * @param host the destination host
+ * @param port the destination port
+ * @param cmd the 4letterword
+ * @return
+ * @throws IOException
+ */
+ public static String send4LetterWord(String host, int port, String cmd)
+ throws IOException
+ {
+ LOG.info("connecting to " + host + " " + port);
+ Socket sock = new Socket(host, port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write(cmd.getBytes());
+ outstream.flush();
+ // this replicates NC - close the output stream before reading
+ sock.shutdownOutput();
+
+ reader =
+ new BufferedReader(
+ new InputStreamReader(sock.getInputStream()));
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while((line = reader.readLine()) != null) {
+ sb.append(line + "\n");
+ }
+ return sb.toString();
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ public static boolean waitForServerUp(String hp, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ // if there are multiple hostports, just take the first one
+ HostPort hpobj = parseHostPortList(hp).get(0);
+ String result = send4LetterWord(hpobj.host, hpobj.port,
"stat");
+ if (result.startsWith("Zookeeper version:") &&
+ !result.contains("READ-ONLY")) {
+ return true;
+ }
+ } catch (IOException e) {
+ // ignore as this is expected
+ LOG.info("server " + hp + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+ public static boolean waitForServerDown(String hp, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ HostPort hpobj = parseHostPortList(hp).get(0);
+ send4LetterWord(hpobj.host, hpobj.port, "stat");
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ public static File createTmpDir() throws IOException {
+ return createTmpDir(BASETEST);
+ }
+ static File createTmpDir(File parentDir) throws IOException {
+ File tmpFile = File.createTempFile("test", ".junit", parentDir);
+ // don't delete tmpFile - this ensures we don't attempt to create
+ // a tmpDir with a duplicate name
+ File tmpDir = new File(tmpFile + ".dir");
+ Assert.assertFalse(tmpDir.exists()); // never true if tmpfile does
it's job
+ Assert.assertTrue(tmpDir.mkdirs());
+
+ return tmpDir;
+ }
+ private static int getPort(String hostPort) {
+ String[] split = hostPort.split(":");
+ String portstr = split[split.length-1];
+ String[] pc = portstr.split("/");
+ if (pc.length > 1) {
+ portstr = pc[0];
+ }
+ return Integer.parseInt(portstr);
+ }
+
+ static ServerCnxnFactory createNewServerInstance(File dataDir,
+ ServerCnxnFactory factory, String hostPort, int maxCnxns)
+ throws IOException, InterruptedException
+ {
+ ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
+ final int PORT = getPort(hostPort);
+ if (factory == null) {
+ factory = ServerCnxnFactory.createFactory(PORT, maxCnxns);
+ }
+ factory.startup(zks);
+ Assert.assertTrue("waiting for server up",
+ ClientBaseWithFixes.waitForServerUp("127.0.0.1:" + PORT,
+ CONNECTION_TIMEOUT));
+
+ return factory;
+ }
+
+ static void shutdownServerInstance(ServerCnxnFactory factory,
+ String hostPort)
+ {
+ if (factory != null) {
+ ZKDatabase zkDb;
+ {
+ ZooKeeperServer zs = getServer(factory);
+
+ zkDb = zs.getZKDatabase();
+ }
+ factory.shutdown();
+ try {
+ zkDb.close();
+ } catch (IOException ie) {
+ LOG.warn("Error closing logs ", ie);
+ }
+ final int PORT = getPort(hostPort);
+
+ Assert.assertTrue("waiting for server down",
+ ClientBaseWithFixes.waitForServerDown("127.0.0.1:" +
PORT,
+ CONNECTION_TIMEOUT));
+ }
+ }
+
+ /**
+ * Test specific setup
+ */
+ public static void setupTestEnv() {
+ // during the tests we run with 100K prealloc in the logs.
+ // on windows systems prealloc of 64M was seen to take ~15seconds
+ // resulting in test Assert.failure (client timeout on first session).
+ // set env and directly in order to handle static init/gc issues
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100 * 1024);
+ }
+
+ protected void setUpAll() throws Exception {
+ allClients = new LinkedList<ZooKeeper>();
+ allClientsSetup = true;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ BASETEST.mkdirs();
+
+ setupTestEnv();
+
+ setUpAll();
+
+ tmpDir = createTmpDir(BASETEST);
+
+ startServer();
+
+ LOG.info("Client test setup finished");
+ }
+
+ protected void startServer() throws Exception {
+ LOG.info("STARTING server");
+ serverFactory = createNewServerInstance(tmpDir, serverFactory,
hostPort, maxCnxns);
+ }
+
+ protected void stopServer() throws Exception {
+ LOG.info("STOPPING server");
+ shutdownServerInstance(serverFactory, hostPort);
+ serverFactory = null;
+ }
+
+
+ protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
+ ZooKeeperServer zs = ServerCnxnFactoryAccessor.getZkServer(fac);
+
+ return zs;
+ }
+
+ protected void tearDownAll() throws Exception {
+ synchronized (this) {
+ if (allClients != null) for (ZooKeeper zk : allClients) {
+ try {
+ if (zk != null)
+ zk.close();
+ } catch (InterruptedException e) {
+ LOG.warn("ignoring interrupt", e);
+ }
+ }
+ allClients = null;
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("tearDown starting");
+
+ tearDownAll();
+
+ stopServer();
+
+ if (tmpDir != null) {
+ Assert.assertTrue("delete " + tmpDir.toString(),
recursiveDelete(tmpDir));
+ }
+
+ // This has to be set to null when the same instance of this class is
reused between test cases
+ serverFactory = null;
+ }
- /**
- * When running on the Jenkins setup, we need to ensure that this
- * build directory exists before running the tests.
- */
- @Override
- public void setUp() throws Exception {
- // build.test.dir is used by zookeeper
- new File(System.getProperty("build.test.dir", "build")).mkdirs();
- super.setUp();
- }
-
- /**
- * ZK seems to have a bug when we muck with its sessions
- * behind its back, causing disconnects, etc. This bug
- * ends up leaving JMX beans around at the end of the test,
- * and ClientBase's teardown method will throw an exception
- * if it finds JMX beans leaked. So, clear them out there
- * to workaround the ZK bug. See ZOOKEEPER-1438.
- */
- @Override
- public void tearDown() throws Exception {
- Set<ObjectName> names = JMXEnv.ensureAll();
- for (ObjectName n : names) {
- try {
- JMXEnv.conn().unregisterMBean(n);
- } catch (Throwable t) {
- // ignore
- }
+ public static boolean recursiveDelete(File d) {
+ if (d.isDirectory()) {
+ File children[] = d.listFiles();
+ for (File f : children) {
+ Assert.assertTrue("delete " + f.toString(),
recursiveDelete(f));
+ }
+ }
+ return d.delete();
}
- }
}