Author: umamahesh
Date: Tue May 29 18:50:07 2012
New Revision: 1343913
URL: http://svn.apache.org/viewvc?rev=1343913&view=rev
Log:
HDFS-3452. BKJM:Switch from standby to active fails and NN gets shut down due
to delay in clearing of lock. Contributed by Uma Maheswara Rao G.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java
Removed:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java?rev=1343913&r1=1343912&r2=1343913&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
Tue May 29 18:50:07 2012
@@ -56,15 +56,13 @@ class BookKeeperEditLogOutputStream
private CountDownLatch syncLatch;
private final AtomicInteger transmitResult
= new AtomicInteger(BKException.Code.OK);
- private final WriteLock wl;
private final Writer writer;
/**
* Construct an edit log output stream which writes to a ledger.
*/
- protected BookKeeperEditLogOutputStream(Configuration conf,
- LedgerHandle lh, WriteLock wl)
+ protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh)
throws IOException {
super();
@@ -72,8 +70,6 @@ class BookKeeperEditLogOutputStream
outstandingRequests = new AtomicInteger(0);
syncLatch = null;
this.lh = lh;
- this.wl = wl;
- this.wl.acquire();
this.writer = new Writer(bufCurrent);
this.transmissionThreshold
= conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
@@ -108,7 +104,6 @@ class BookKeeperEditLogOutputStream
throw new IOException("BookKeeper error during abort", bke);
}
- wl.release();
}
@Override
@@ -118,8 +113,6 @@ class BookKeeperEditLogOutputStream
@Override
public void write(FSEditLogOp op) throws IOException {
- wl.checkWriteLock();
-
writer.writeOp(op);
if (bufCurrent.getLength() > transmissionThreshold) {
@@ -129,19 +122,15 @@ class BookKeeperEditLogOutputStream
@Override
public void setReadyToFlush() throws IOException {
- wl.checkWriteLock();
-
transmit();
- synchronized(this) {
+ synchronized (this) {
syncLatch = new CountDownLatch(outstandingRequests.get());
}
}
@Override
public void flushAndSync() throws IOException {
- wl.checkWriteLock();
-
assert(syncLatch != null);
try {
syncLatch.await();
@@ -164,8 +153,6 @@ class BookKeeperEditLogOutputStream
* are never called at the same time.
*/
private void transmit() throws IOException {
- wl.checkWriteLock();
-
if (!transmitResult.compareAndSet(BKException.Code.OK,
BKException.Code.OK)) {
throw new IOException("Trying to write to an errored stream;"
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java?rev=1343913&r1=1343912&r2=1343913&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
Tue May 29 18:50:07 2012
@@ -117,7 +117,7 @@ public class BookKeeperJournalManager im
private final ZooKeeper zkc;
private final Configuration conf;
private final BookKeeper bkc;
- private final WriteLock wl;
+ private final CurrentInprogress ci;
private final String ledgerPath;
private final MaxTxId maxTxId;
private final int ensembleSize;
@@ -155,7 +155,7 @@ public class BookKeeperJournalManager im
ledgerPath = zkPath + "/ledgers";
String maxTxIdPath = zkPath + "/maxtxid";
- String lockPath = zkPath + "/lock";
+ String currentInprogressNodePath = zkPath + "/CurrentInprogress";
String versionPath = zkPath + "/version";
digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
@@ -192,7 +192,7 @@ public class BookKeeperJournalManager im
throw new IOException("Error initializing zk", e);
}
- wl = new WriteLock(zkc, lockPath);
+ ci = new CurrentInprogress(zkc, currentInprogressNodePath);
maxTxId = new MaxTxId(zkc, maxTxIdPath);
}
@@ -207,13 +207,16 @@ public class BookKeeperJournalManager im
*/
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
- wl.acquire();
-
if (txId <= maxTxId.get()) {
throw new IOException("We've already seen " + txId
+ ". A new stream cannot be created with it");
}
try {
+ String existingInprogressNode = ci.read();
+ if (null != existingInprogressNode
+ && zkc.exists(existingInprogressNode, false) != null) {
+ throw new IOException("Inprogress node already exists");
+ }
if (currentLedger != null) {
// bookkeeper errored on last stream, clean up ledger
currentLedger.close();
@@ -234,7 +237,8 @@ public class BookKeeperJournalManager im
l.write(zkc, znodePath);
maxTxId.store(txId);
- return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
+ ci.update(znodePath);
+ return new BookKeeperEditLogOutputStream(conf, currentLedger);
} catch (Exception e) {
if (currentLedger != null) {
try {
@@ -270,7 +274,6 @@ public class BookKeeperJournalManager im
+ " doesn't exist");
}
- wl.checkWriteLock();
EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, inprogressPath);
@@ -307,13 +310,15 @@ public class BookKeeperJournalManager im
}
maxTxId.store(lastTxId);
zkc.delete(inprogressPath, inprogressStat.getVersion());
+ String inprogressPathFromCI = ci.read();
+ if (inprogressPath.equals(inprogressPathFromCI)) {
+ ci.clear();
+ }
} catch (KeeperException e) {
throw new IOException("Error finalising ledger", e);
} catch (InterruptedException ie) {
throw new IOException("Error finalising ledger", ie);
- } finally {
- wl.release();
- }
+ }
}
EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
@@ -417,7 +422,6 @@ public class BookKeeperJournalManager im
@Override
public void recoverUnfinalizedSegments() throws IOException {
- wl.acquire();
synchronized (this) {
try {
List<String> children = zkc.getChildren(ledgerPath, false);
@@ -445,10 +449,6 @@ public class BookKeeperJournalManager im
} catch (InterruptedException ie) {
throw new IOException("Interrupted getting list of inprogress
segments",
ie);
- } finally {
- if (wl.haveLock()) {
- wl.release();
- }
}
}
}
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java?rev=1343913&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
Tue May 29 18:50:07 2012
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Distributed write permission lock, using ZooKeeper. Read the version number
+ * and return the current inprogress node path available in CurrentInprogress
+ * path. If it exist, caller can treat that some other client already operating
+ * on it. Then caller can take action. If there is no inprogress node exist,
+ * then caller can treat that there is no client operating on it. Later same
+ * caller should update the his newly created inprogress node path. At this
+ * point, if some other activities done on this node, version number might
+ * change, so update will fail. So, this read, update api will ensure that
there
+ * is only node can continue further after checking with CurrentInprogress.
+ */
+
+class CurrentInprogress {
+ private static final String CONTENT_DELIMITER = ",";
+
+ static final Log LOG = LogFactory.getLog(CurrentInprogress.class);
+
+ private final ZooKeeper zkc;
+ private final String currentInprogressNode;
+ private volatile int versionNumberForPermission = -1;
+ private static final int CURRENT_INPROGRESS_LAYOUT_VERSION = -1;
+ private final String hostName = InetAddress.getLocalHost().toString();
+
+ CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
+ this.currentInprogressNode = lockpath;
+ this.zkc = zkc;
+ try {
+ Stat isCurrentInprogressNodeExists = zkc.exists(lockpath, false);
+ if (isCurrentInprogressNodeExists == null) {
+ try {
+ zkc.create(lockpath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (NodeExistsException e) {
+ // Node might created by other process at the same time. Ignore it.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(lockpath + " already created by other process.", e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("Exception accessing Zookeeper", e);
+ }
+ }
+
+ /**
+ * Update the path with prepending version number and hostname
+ *
+ * @param path
+ * - to be updated in zookeeper
+ * @throws IOException
+ */
+ void update(String path) throws IOException {
+ String content = CURRENT_INPROGRESS_LAYOUT_VERSION
+ + CONTENT_DELIMITER + hostName + CONTENT_DELIMITER + path;
+ try {
+ zkc.setData(this.currentInprogressNode, content.getBytes(),
+ this.versionNumberForPermission);
+ } catch (KeeperException e) {
+ throw new IOException("Exception when setting the data "
+ + "[layout version number,hostname,inprogressNode path]= [" + content
+ + "] to CurrentInprogress. ", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while setting the data "
+ + "[layout version number,hostname,inprogressNode path]= [" + content
+ + "] to CurrentInprogress", e);
+ }
+ LOG.info("Updated data[layout version number,hostname,inprogressNode path]"
+ + "= [" + content + "] to CurrentInprogress");
+ }
+
+ /**
+ * Read the CurrentInprogress node data from Zookeeper and also get the znode
+ * version number. Return the 3rd field from the data. i.e saved path with
+ * #update api
+ *
+ * @return available inprogress node path. returns null if not available.
+ * @throws IOException
+ */
+ String read() throws IOException {
+ Stat stat = new Stat();
+ byte[] data = null;
+ try {
+ data = zkc.getData(this.currentInprogressNode, false, stat);
+ } catch (KeeperException e) {
+ throw new IOException("Exception while reading the data from "
+ + currentInprogressNode, e);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while reading data from "
+ + currentInprogressNode, e);
+ }
+ this.versionNumberForPermission = stat.getVersion();
+ if (data != null) {
+ String stringData = new String(data);
+ LOG.info("Read data[layout version number,hostname,inprogressNode path]"
+ + "= [" + stringData + "] from CurrentInprogress");
+ String[] contents = stringData.split(CONTENT_DELIMITER);
+ assert contents.length == 3 : "As per the current data format, "
+ + "CurrentInprogress node data should contain 3 fields. "
+ + "i.e layout version number,hostname,inprogressNode path";
+ String layoutVersion = contents[0];
+ if (Long.valueOf(layoutVersion) > CURRENT_INPROGRESS_LAYOUT_VERSION) {
+ throw new IOException(
+ "Supported layout version of CurrentInprogress node is : "
+ + CURRENT_INPROGRESS_LAYOUT_VERSION
+ + " . Layout version of CurrentInprogress node in ZK is : "
+ + layoutVersion);
+ }
+ String inprogressNodePath = contents[2];
+ return inprogressNodePath;
+ } else {
+ LOG.info("No data available in CurrentInprogress");
+ }
+ return null;
+ }
+
+ /** Clear the CurrentInprogress node data */
+ void clear() throws IOException {
+ try {
+ zkc.setData(this.currentInprogressNode, null,
versionNumberForPermission);
+ } catch (KeeperException e) {
+ throw new IOException(
+ "Exception when setting the data to CurrentInprogress node", e);
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Interrupted when setting the data to CurrentInprogress node", e);
+ }
+ LOG.info("Cleared the data from CurrentInprogress");
+ }
+
+}
\ No newline at end of file
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java?rev=1343913&r1=1343912&r2=1343913&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
Tue May 29 18:50:07 2012
@@ -215,8 +215,7 @@ public class TestBookKeeperAsHASharedDir
}
/**
- * Test that two namenodes can't become primary at the same
- * time.
+ * Test that two namenodes can't continue as primary
*/
@Test
public void testMultiplePrimariesStarted() throws Exception {
@@ -247,21 +246,17 @@ public class TestBookKeeperAsHASharedDir
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
fs.mkdirs(p1);
nn1.getRpcServer().rollEditLog();
- try {
- cluster.transitionToActive(1);
- fail("Shouldn't have been able to start two primaries"
- + " with single shared storage");
- } catch (ServiceFailedException sfe) {
- assertTrue("Wrong exception",
- sfe.getMessage().contains("Failed to start active services"));
- }
+ cluster.transitionToActive(1);
+ fs = cluster.getFileSystem(0); // get the older active server.
+ // This edit log updation on older active should make older active
+ // shutdown.
+ fs.delete(p1, true);
+ verify(mockRuntime1, atLeastOnce()).exit(anyInt());
+ verify(mockRuntime2, times(0)).exit(anyInt());
} finally {
- verify(mockRuntime1, times(0)).exit(anyInt());
- verify(mockRuntime2, atLeastOnce()).exit(anyInt());
-
if (cluster != null) {
cluster.shutdown();
}
}
}
-}
\ No newline at end of file
+}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1343913&r1=1343912&r2=1343913&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
Tue May 29 18:50:07 2012
@@ -361,6 +361,7 @@ public class TestBookKeeperJournalManage
assertEquals("New bookie didn't start",
numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
+ bkjm.recoverUnfinalizedSegments();
out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java?rev=1343913&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java
Tue May 29 18:50:07 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.util.LocalBookKeeper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests that read, update, clear api from CurrentInprogress
+ */
+public class TestCurrentInprogress {
+ private static final Log LOG =
LogFactory.getLog(TestCurrentInprogress.class);
+ private static final String CURRENT_NODE_PATH = "/test";
+ private static final String HOSTPORT = "127.0.0.1:2181";
+ private static final int CONNECTION_TIMEOUT = 30000;
+ private static NIOServerCnxnFactory serverFactory;
+ private static ZooKeeperServer zks;
+ private static ZooKeeper zkc;
+ private static int ZooKeeperDefaultPort = 2181;
+ private static File zkTmpDir;
+
+ private static ZooKeeper connectZooKeeper(String ensemble)
+ throws IOException, KeeperException, InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ZooKeeper zkc = new ZooKeeper(HOSTPORT, 3600, new Watcher() {
+ public void process(WatchedEvent event) {
+ if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
+ latch.countDown();
+ }
+ }
+ });
+ if (!latch.await(10, TimeUnit.SECONDS)) {
+ throw new IOException("Zookeeper took too long to connect");
+ }
+ return zkc;
+ }
+
+ @BeforeClass
+ public static void setupZooKeeper() throws Exception {
+ LOG.info("Starting ZK server");
+ zkTmpDir = File.createTempFile("zookeeper", "test");
+ zkTmpDir.delete();
+ zkTmpDir.mkdir();
+ try {
+ zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperDefaultPort);
+ serverFactory = new NIOServerCnxnFactory();
+ serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10);
+ serverFactory.startup(zks);
+ } catch (Exception e) {
+ LOG.error("Exception while instantiating ZooKeeper", e);
+ }
+ boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
+ LOG.debug("ZooKeeper server up: " + b);
+ }
+
+ @AfterClass
+ public static void shutDownServer() {
+ if (null != zks) {
+ zks.shutdown();
+ }
+ zkTmpDir.delete();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ zkc = connectZooKeeper(HOSTPORT);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (null != zkc) {
+ zkc.close();
+ }
+
+ }
+
+ /**
+ * Tests that read should be able to read the data which updated with update
+ * api
+ */
+ @Test
+ public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception {
+ String data = "inprogressNode";
+ CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
+ ci.update(data);
+ String inprogressNodePath = ci.read();
+ assertEquals("Not returning inprogressZnode", "inprogressNode",
+ inprogressNodePath);
+ }
+
+ /**
+ * Tests that read should return null if we clear the updated data in
+ * CurrentInprogress node
+ */
+ @Test
+ public void testReadShouldReturnNullAfterClear() throws Exception {
+ CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
+ ci.update("myInprogressZnode");
+ ci.read();
+ ci.clear();
+ String inprogressNodePath = ci.read();
+ assertEquals("Expecting null to be return", null, inprogressNodePath);
+ }
+
+ /**
+ * Tests that update should throw IOE, if version number modifies between
read
+ * and update
+ */
+ @Test(expected = IOException.class)
+ public void testUpdateShouldFailWithIOEIfVersionNumberChangedAfterRead()
+ throws Exception {
+ CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
+ ci.update("myInprogressZnode");
+ assertEquals("Not returning myInprogressZnode", "myInprogressZnode", ci
+ .read());
+ // Updating data in-between to change the data to change the version number
+ ci.update("YourInprogressZnode");
+ ci.update("myInprogressZnode");
+ }
+
+}
\ No newline at end of file