Author: umamahesh
Date: Thu May 31 18:09:04 2012
New Revision: 1344840
URL: http://svn.apache.org/viewvc?rev=1344840&view=rev
Log:
HDFS-3423. BKJM: NN startup is failing, when tries to
recoverUnfinalizedSegments() a bad inProgress_ ZNodes. Contributed by Ivan and
Uma.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java?rev=1344840&r1=1344839&r2=1344840&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
Thu May 31 18:09:04 2012
@@ -73,6 +73,11 @@ class BookKeeperEditLogInputStream exten
this.logVersion = metadata.getVersion();
this.inProgress = metadata.isInProgress();
+ if (firstBookKeeperEntry < 0
+ || firstBookKeeperEntry > lh.getLastAddConfirmed()) {
+ throw new IOException("Invalid first bk entry to read: "
+ + firstBookKeeperEntry + ", LAC: " + lh.getLastAddConfirmed());
+ }
BufferedInputStream bin = new BufferedInputStream(
new LedgerInputStream(lh, firstBookKeeperEntry));
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java?rev=1344840&r1=1344839&r2=1344840&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
Thu May 31 18:09:04 2012
@@ -461,17 +461,34 @@ public class BookKeeperJournalManager im
continue;
}
String znode = ledgerPath + "/" + child;
- EditLogLedgerMetadata l
- = EditLogLedgerMetadata.read(zkc, znode);
- long endTxId = recoverLastTxId(l, true);
- if (endTxId == HdfsConstants.INVALID_TXID) {
- LOG.error("Unrecoverable corruption has occurred in segment "
- + l.toString() + " at path " + znode
- + ". Unable to continue recovery.");
- throw new IOException("Unrecoverable corruption,"
- + " please check logs.");
+ EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, znode);
+ try {
+ long endTxId = recoverLastTxId(l, true);
+ if (endTxId == HdfsConstants.INVALID_TXID) {
+ LOG.error("Unrecoverable corruption has occurred in segment "
+ + l.toString() + " at path " + znode
+ + ". Unable to continue recovery.");
+ throw new IOException("Unrecoverable corruption,"
+ + " please check logs.");
+ }
+ finalizeLogSegment(l.getFirstTxId(), endTxId);
+ } catch (SegmentEmptyException see) {
+ LOG.warn("Inprogress znode " + child
+ + " refers to a ledger which is empty. This occurs when the NN"
+ + " crashes after opening a segment, but before writing the"
+ + " OP_START_LOG_SEGMENT op. It is safe to delete."
+ + " MetaData [" + l.toString() + "]");
+
+ // If the max seen transaction is the same as what would
+ // have been the first transaction of the failed ledger,
+ // decrement it, as that transaction never happened and as
+ // such, is _not_ the last seen
+ if (maxTxId.get() == l.getFirstTxId()) {
+ maxTxId.reset(maxTxId.get() - 1);
+ }
+
+ zkc.delete(znode, -1);
}
- finalizeLogSegment(l.getFirstTxId(), endTxId);
}
} catch (KeeperException.NoNodeException nne) {
// nothing to recover, ignore
@@ -532,9 +549,9 @@ public class BookKeeperJournalManager im
* ledger.
*/
private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
- throws IOException {
+ throws IOException, SegmentEmptyException {
+ LedgerHandle lh = null;
try {
- LedgerHandle lh = null;
if (fence) {
lh = bkc.openLedger(l.getLedgerId(),
BookKeeper.DigestType.MAC,
@@ -544,9 +561,21 @@ public class BookKeeperJournalManager im
BookKeeper.DigestType.MAC,
digestpw.getBytes());
}
+ } catch (BKException bke) {
+ throw new IOException("Exception opening ledger for " + l, bke);
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted opening ledger for " + l, ie);
+ }
+
+ BookKeeperEditLogInputStream in = null;
+
+ try {
long lastAddConfirmed = lh.getLastAddConfirmed();
- BookKeeperEditLogInputStream in
- = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
+ if (lastAddConfirmed == -1) {
+ throw new SegmentEmptyException();
+ }
+
+ in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
long endTxId = HdfsConstants.INVALID_TXID;
FSEditLogOp op = in.readOp();
@@ -558,12 +587,10 @@ public class BookKeeperJournalManager im
op = in.readOp();
}
return endTxId;
- } catch (BKException e) {
- throw new IOException("Exception retreiving last tx id for ledger " + l,
- e);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted while retreiving last tx id "
- + "for ledger " + l, ie);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
}
}
@@ -613,4 +640,7 @@ public class BookKeeperJournalManager im
}
}
}
+
+ private static class SegmentEmptyException extends IOException {
+ }
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java?rev=1344840&r1=1344839&r2=1344840&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
Thu May 31 18:09:04 2012
@@ -149,8 +149,8 @@ public class EditLogLedgerMetadata {
version, ledgerId, firstTxId, lastTxId);
}
try {
- zkc.create(path, finalisedData.getBytes(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create(path, finalisedData.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
throw nee;
} catch (KeeperException e) {
@@ -167,7 +167,7 @@ public class EditLogLedgerMetadata {
LOG.trace("Verifying " + this.toString()
+ " against " + other);
}
- return other == this;
+ return other.equals(this);
} catch (KeeperException e) {
LOG.error("Couldn't verify data in " + path, e);
return false;
@@ -188,12 +188,12 @@ public class EditLogLedgerMetadata {
&& version == ol.version;
}
- public int hashCode() {
+ public int hashCode() {
int hash = 1;
- hash = hash * 31 + (int)ledgerId;
- hash = hash * 31 + (int)firstTxId;
- hash = hash * 31 + (int)lastTxId;
- hash = hash * 31 + (int)version;
+ hash = hash * 31 + (int) ledgerId;
+ hash = hash * 31 + (int) firstTxId;
+ hash = hash * 31 + (int) lastTxId;
+ hash = hash * 31 + (int) version;
return hash;
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java?rev=1344840&r1=1344839&r2=1344840&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
Thu May 31 18:09:04 2012
@@ -18,15 +18,15 @@
package org.apache.hadoop.contrib.bkjournal;
import java.io.IOException;
-import org.apache.zookeeper.ZooKeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
/**
* Utility class for storing and reading
* the max seen txid in zookeeper
@@ -50,20 +50,24 @@ class MaxTxId {
if (LOG.isTraceEnabled()) {
LOG.trace("Setting maxTxId to " + maxTxId);
}
- String txidStr = Long.toString(maxTxId);
- try {
- if (currentStat != null) {
- currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"),
- currentStat.getVersion());
- } else {
- zkc.create(path, txidStr.getBytes("UTF-8"),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- } catch (KeeperException e) {
- throw new IOException("Error writing max tx id", e);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while writing max tx id", e);
+ reset(maxTxId);
+ }
+ }
+
+ synchronized void reset(long maxTxId) throws IOException {
+ String txidStr = Long.toString(maxTxId);
+ try {
+ if (currentStat != null) {
+ currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"), currentStat
+ .getVersion());
+ } else {
+ zkc.create(path, txidStr.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
}
+ } catch (KeeperException e) {
+ throw new IOException("Error writing max tx id", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while writing max tx id", e);
}
}
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java?rev=1344840&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java
Thu May 31 18:09:04 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Unit test for the bkjm's streams
+ */
+public class TestBookKeeperEditLogStreams {
+ static final Log LOG = LogFactory.getLog(TestBookKeeperEditLogStreams.class);
+
+ private static BKJMUtil bkutil;
+ private final static int numBookies = 3;
+
+ @BeforeClass
+ public static void setupBookkeeper() throws Exception {
+ bkutil = new BKJMUtil(numBookies);
+ bkutil.start();
+ }
+
+ @AfterClass
+ public static void teardownBookkeeper() throws Exception {
+ bkutil.teardown();
+ }
+
+ /**
+ * Test that bkjm will refuse open a stream on an empty
+ * ledger.
+ */
+ @Test
+ public void testEmptyInputStream() throws Exception {
+ ZooKeeper zk = BKJMUtil.connectZooKeeper();
+
+ BookKeeper bkc = new BookKeeper(new ClientConfiguration(), zk);
+ try {
+ LedgerHandle lh = bkc.createLedger(BookKeeper.DigestType.CRC32, "foobar"
+ .getBytes());
+ lh.close();
+
+ EditLogLedgerMetadata metadata = new EditLogLedgerMetadata("/foobar",
+ HdfsConstants.LAYOUT_VERSION, lh.getId(), 0x1234);
+ try {
+ new BookKeeperEditLogInputStream(lh, metadata, -1);
+ fail("Shouldn't get this far, should have thrown");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("Invalid first bk entry to
read"));
+ }
+
+ metadata = new EditLogLedgerMetadata("/foobar",
+ HdfsConstants.LAYOUT_VERSION, lh.getId(), 0x1234);
+ try {
+ new BookKeeperEditLogInputStream(lh, metadata, 0);
+ fail("Shouldn't get this far, should have thrown");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("Invalid first bk entry to
read"));
+ }
+ } finally {
+ bkc.close();
+ zk.close();
+ }
+ }
+}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1344840&r1=1344839&r2=1344840&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
Thu May 31 18:09:04 2012
@@ -25,6 +25,8 @@ import org.junit.BeforeClass;
import org.junit.AfterClass;
import java.io.IOException;
+import java.net.URI;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@@ -34,7 +36,9 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -74,7 +78,6 @@ public class TestBookKeeperJournalManage
public void testSimpleWrite() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
- long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -94,7 +97,6 @@ public class TestBookKeeperJournalManage
public void testNumberOfTransactions() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
- long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -246,10 +248,12 @@ public class TestBookKeeperJournalManage
EditLogOutputStream out1 = bkjm1.startLogSegment(start);
try {
- EditLogOutputStream out2 = bkjm2.startLogSegment(start);
+ bkjm2.startLogSegment(start);
fail("Shouldn't have been able to open the second writer");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
+ }finally{
+ out1.close();
}
}
@@ -257,7 +261,6 @@ public class TestBookKeeperJournalManage
public void testSimpleRead() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
- long txid = 1;
final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= numTransactions; i++) {
@@ -283,7 +286,6 @@ public class TestBookKeeperJournalManage
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
EditLogOutputStream out = bkjm.startLogSegment(1);
- long txid = 1;
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@@ -449,5 +451,167 @@ public class TestBookKeeperJournalManage
}
}
}
+
+ /**
+ * If a journal manager has an empty inprogress node, ensure that we throw an
+ * error, as this should not be possible, and some third party has corrupted
+ * the zookeeper state
+ */
+ @Test
+ public void testEmptyInprogressNode() throws Exception {
+ URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(1, 100);
+
+ out = bkjm.startLogSegment(101);
+ out.close();
+ bkjm.close();
+ String inprogressZNode = bkjm.inprogressZNode(101);
+ zkc.setData(inprogressZNode, new byte[0], -1);
+
+ bkjm = new BookKeeperJournalManager(conf, uri);
+ try {
+ bkjm.recoverUnfinalizedSegments();
+ fail("Should have failed. There should be no way of creating"
+ + " an empty inprogess znode");
+ } catch (IOException e) {
+ // correct behaviour
+ assertTrue("Exception different than expected", e.getMessage().contains(
+ "Invalid ledger entry,"));
+ } finally {
+ bkjm.close();
+ }
+ }
+
+ /**
+ * If a journal manager has an corrupt inprogress node, ensure that we throw
+ * an error, as this should not be possible, and some third party has
+ * corrupted the zookeeper state
+ */
+ @Test
+ public void testCorruptInprogressNode() throws Exception {
+ URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(1, 100);
+
+ out = bkjm.startLogSegment(101);
+ out.close();
+ bkjm.close();
+
+ String inprogressZNode = bkjm.inprogressZNode(101);
+ zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
+
+ bkjm = new BookKeeperJournalManager(conf, uri);
+ try {
+ bkjm.recoverUnfinalizedSegments();
+ fail("Should have failed. There should be no way of creating"
+ + " an empty inprogess znode");
+ } catch (IOException e) {
+ // correct behaviour
+ assertTrue("Exception different than expected", e.getMessage().contains(
+ "Invalid ledger entry,"));
+
+ } finally {
+ bkjm.close();
+ }
+ }
+
+ /**
+ * Cases can occur where we create a segment but crash before we even have
the
+ * chance to write the START_SEGMENT op. If this occurs we should warn, but
+ * load as normal
+ */
+ @Test
+ public void testEmptyInprogressLedger() throws Exception {
+ URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(1, 100);
+
+ out = bkjm.startLogSegment(101);
+ out.close();
+ bkjm.close();
+
+ bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm.recoverUnfinalizedSegments();
+ out = bkjm.startLogSegment(101);
+ for (long i = 1; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(101, 200);
+
+ bkjm.close();
+ }
+
+ /**
+ * Test that if we fail between finalizing an inprogress and deleting the
+ * corresponding inprogress znode.
+ */
+ @Test
+ public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
+ URI uri = BKJMUtil
+ .createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.close();
+
+ String inprogressZNode = bkjm.inprogressZNode(1);
+ String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100);
+ assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode,
+ null));
+ assertNull("finalized znode exists", zkc.exists(finalizedZNode, null));
+
+ byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
+
+ // finalize
+ bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm.recoverUnfinalizedSegments();
+ bkjm.close();
+
+ assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null));
+ assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode,
+ null));
+
+ zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ // should work fine
+ bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm.recoverUnfinalizedSegments();
+ bkjm.close();
+ }
}