Author: umamahesh
Date: Thu May 31 19:29:58 2012
New Revision: 1344874
URL: http://svn.apache.org/viewvc?rev=1344874&view=rev
Log:
HDFS-3441. Race condition between rolling logs at active NN and purging at
standby. Contributed by Rakesh R.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java?rev=1344874&r1=1344873&r2=1344874&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
Thu May 31 19:29:58 2012
@@ -49,7 +49,7 @@ import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
+import com.google.common.annotations.VisibleForTesting;
/**
* BookKeeper Journal Manager
*
@@ -122,7 +122,9 @@ public class BookKeeperJournalManager im
= "dfs.namenode.bookkeeperjournal.zk.session.timeout";
public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000;
- private final ZooKeeper zkc;
+ private static final String BKJM_EDIT_INPROGRESS = "inprogress_";
+
+ private ZooKeeper zkc;
private final Configuration conf;
private final BookKeeper bkc;
private final CurrentInprogress ci;
@@ -351,13 +353,9 @@ public class BookKeeperJournalManager im
EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
throws IOException {
- for (EditLogLedgerMetadata l : getLedgerList()) {
+ for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
long lastTxId = l.getLastTxId();
if (l.isInProgress()) {
- if (!inProgressOk) {
- continue;
- }
-
lastTxId = recoverLastTxId(l, false);
}
@@ -413,13 +411,9 @@ public class BookKeeperJournalManager im
throws IOException {
long count = 0;
long expectedStart = 0;
- for (EditLogLedgerMetadata l : getLedgerList()) {
+ for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
long lastTxId = l.getLastTxId();
if (l.isInProgress()) {
- if (!inProgressOk) {
- continue;
- }
-
lastTxId = recoverLastTxId(l, false);
if (lastTxId == HdfsConstants.INVALID_TXID) {
break;
@@ -457,7 +451,7 @@ public class BookKeeperJournalManager im
try {
List<String> children = zkc.getChildren(ledgerPath, false);
for (String child : children) {
- if (!child.startsWith("inprogress_")) {
+ if (!child.startsWith(BKJM_EDIT_INPROGRESS)) {
continue;
}
String znode = ledgerPath + "/" + child;
@@ -504,9 +498,8 @@ public class BookKeeperJournalManager im
@Override
public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException {
- for (EditLogLedgerMetadata l : getLedgerList()) {
- if (!l.isInProgress()
- && l.getLastTxId() < minTxIdToKeep) {
+ for (EditLogLedgerMetadata l : getLedgerList(false)) {
+ if (l.getLastTxId() < minTxIdToKeep) {
try {
Stat stat = zkc.exists(l.getZkPath(), false);
zkc.delete(l.getZkPath(), stat.getVersion());
@@ -597,13 +590,26 @@ public class BookKeeperJournalManager im
/**
* Get a list of all segments in the journal.
*/
- private List<EditLogLedgerMetadata> getLedgerList() throws IOException {
+ List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
+ throws IOException {
List<EditLogLedgerMetadata> ledgers
= new ArrayList<EditLogLedgerMetadata>();
try {
List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
- for (String n : ledgerNames) {
- ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n));
+ for (String ledgerName : ledgerNames) {
+ if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) {
+ continue;
+ }
+ String legderMetadataPath = ledgerPath + "/" + ledgerName;
+ try {
+ EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
+ .read(zkc, legderMetadataPath);
+ ledgers.add(editLogLedgerMetadata);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.warn("ZNode: " + legderMetadataPath
+ + " might have finalized and deleted."
+ + " So ignoring NoNodeException.");
+ }
}
} catch (KeeperException e) {
throw new IOException("Exception reading ledger list from zk", e);
@@ -630,6 +636,11 @@ public class BookKeeperJournalManager im
return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16);
}
+ @VisibleForTesting
+ void setZooKeeper(ZooKeeper zk) {
+ this.zkc = zk;
+ }
+
/**
* Simple watcher to notify when zookeeper has connected
*/
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1344874&r1=1344873&r2=1344874&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
Thu May 31 19:29:58 2012
@@ -18,14 +18,17 @@
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.spy;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.AfterClass;
+import org.mockito.Mockito;
import java.io.IOException;
import java.net.URI;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -37,6 +40,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -614,4 +618,57 @@ public class TestBookKeeperJournalManage
bkjm.close();
}
+ /**
+ * Tests that the edit log file meta data reading from ZooKeeper should be
+ * able to handle the NoNodeException. bkjm.getInputStream(fromTxId,
+ * inProgressOk) should suppress the NoNodeException and continue. HDFS-3441.
+ */
+ @Test
+ public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
+ URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile");
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+ try {
+ // start new inprogress log segment with txid=1
+ // and write transactions till txid=50
+ String zkpath1 = startAndFinalizeLogSegment(bkjm, 1, 50);
+
+ // start new inprogress log segment with txid=51
+ // and write transactions till txid=100
+ String zkpath2 = startAndFinalizeLogSegment(bkjm, 51, 100);
+
+ // read the metadata from ZK. Here simulating the situation
+ // when reading,the edit log metadata can be removed by purger thread.
+ ZooKeeper zkspy = spy(BKJMUtil.connectZooKeeper());
+ bkjm.setZooKeeper(zkspy);
+ Mockito.doThrow(
+ new KeeperException.NoNodeException(zkpath2 + " doesn't exists"))
+ .when(zkspy).getData(zkpath2, false, null);
+
+ List<EditLogLedgerMetadata> ledgerList = bkjm.getLedgerList(false);
+ assertEquals("List contains the metadata of non exists path.", 1,
+ ledgerList.size());
+ assertEquals("LogLedgerMetadata contains wrong zk paths.", zkpath1,
+ ledgerList.get(0).getZkPath());
+ } finally {
+ bkjm.close();
+ }
+ }
+
+ private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
+ int startTxid, int endTxid) throws IOException, KeeperException,
+ InterruptedException {
+ EditLogOutputStream out = bkjm.startLogSegment(startTxid);
+ for (long i = startTxid; i <= endTxid; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ // finalize the inprogress_1 log segment.
+ bkjm.finalizeLogSegment(startTxid, endTxid);
+ String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid);
+ assertNotNull(zkc.exists(zkpath1, false));
+ assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false));
+ return zkpath1;
+ }
}