Author: todd
Date: Mon Jul 30 23:35:22 2012
New Revision: 1367366
URL: http://svn.apache.org/viewvc?rev=1367366&view=rev
Log:
HDFS-3725. Fix QJM startup when individual JNs have gaps. Contributed by Todd
Lipcon.
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1367366&r1=1367365&r2=1367366&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
Mon Jul 30 23:35:22 2012
@@ -8,3 +8,5 @@ HDFS-3694. Fix getEditLogManifest to fet
HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd)
HDFS-3693. JNStorage should read its storage info even before a writer becomes
active (todd)
+
+HDFS-3725. Fix QJM startup when individual JNs have gaps (todd)
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java?rev=1367366&r1=1367365&r2=1367366&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
Mon Jul 30 23:35:22 2012
@@ -40,8 +40,8 @@ public class RemoteEditLogManifest {
/**
- * Check that the logs are contiguous and non-overlapping
- * sequences of transactions, in sorted order
+ * Check that the logs are non-overlapping sequences of transactions,
+ * in sorted order. They do not need to be contiguous.
* @throws IllegalStateException if incorrect
*/
private void checkState() {
@@ -50,8 +50,10 @@ public class RemoteEditLogManifest {
RemoteEditLog prev = null;
for (RemoteEditLog log : logs) {
if (prev != null) {
- if (log.getStartTxId() != prev.getEndTxId() + 1) {
- throw new IllegalStateException("Invalid log manifest:" + this);
+ if (log.getStartTxId() <= prev.getEndTxId()) {
+ throw new IllegalStateException(
+ "Invalid log manifest (log " + log + " overlaps " + prev + ")\n"
+ + this);
}
}
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1367366&r1=1367365&r2=1367366&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
Mon Jul 30 23:35:22 2012
@@ -23,12 +23,15 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
@@ -43,7 +46,9 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -67,10 +72,17 @@ public class TestQuorumJournalManager {
private Configuration conf;
private QuorumJournalManager qjm;
private List<AsyncLogger> spies;
+
+ static {
+ ((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL);
+ }
@Before
public void setup() throws Exception {
conf = new Configuration();
+ // Don't retry connections - it just slows down the tests.
+
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
0);
+
cluster = new MiniJournalCluster.Builder(conf)
.build();
@@ -117,11 +129,7 @@ public class TestQuorumJournalManager {
assertEquals(1, stream.getFirstTxId());
assertEquals(3, stream.getLastTxId());
- for (int i = 1; i <= 3; i++) {
- FSEditLogOp op = stream.readOp();
- assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
- assertEquals(i, op.getTransactionId());
- }
+ verifyEdits(streams, 1, 3);
assertNull(stream.readOp());
} finally {
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
@@ -137,6 +145,7 @@ public class TestQuorumJournalManager {
EditLogInputStream stream = streams.get(0);
assertEquals(1, stream.getFirstTxId());
assertEquals(3, stream.getLastTxId());
+ verifyEdits(streams, 1, 3);
} finally {
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
streams.clear();
@@ -153,6 +162,8 @@ public class TestQuorumJournalManager {
assertEquals(2, streams.size());
assertEquals(4, streams.get(1).getFirstTxId());
assertEquals(6, streams.get(1).getLastTxId());
+
+ verifyEdits(streams, 1, 6);
} finally {
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
streams.clear();
@@ -160,6 +171,35 @@ public class TestQuorumJournalManager {
}
/**
+ * Regression test for HDFS-3725. One of the journal nodes is down
+ * during the writing of one segment, then comes back up later to
+ * take part in a later segment. Thus, its local edits are
+ * not a contiguous sequence. This should be handled correctly.
+ */
+ @Test
+ public void testOneJNMissingSegments() throws Exception {
+ writeSegment(qjm, 1, 3, true);
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
+ cluster.getJournalNode(0).stopAndJoin(0);
+ writeSegment(qjm, 4, 3, true);
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
+ cluster.restartJournalNode(0);
+ writeSegment(qjm, 7, 3, true);
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
+ cluster.getJournalNode(1).stopAndJoin(0);
+
+ QuorumJournalManager readerQjm = createSpyingQJM();
+ List<EditLogInputStream> streams = Lists.newArrayList();
+ try {
+ readerQjm.selectInputStreams(streams, 1, false);
+ verifyEdits(streams, 1, 9);
+ } finally {
+ IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+ readerQjm.close();
+ }
+ }
+
+ /**
* TODO: this test needs to be fleshed out to be an exhaustive failure test
* @throws Exception
*/
@@ -425,6 +465,41 @@ public class TestQuorumJournalManager {
stm.flush();
}
+ /**
+ * Verify that the given list of streams contains exactly the range of
+ * transactions specified, inclusive.
+ */
+ private void verifyEdits(List<EditLogInputStream> streams,
+ int firstTxnId, int lastTxnId) throws IOException {
+
+ Iterator<EditLogInputStream> iter = streams.iterator();
+ assertTrue(iter.hasNext());
+ EditLogInputStream stream = iter.next();
+
+ for (int expected = firstTxnId;
+ expected <= lastTxnId;
+ expected++) {
+
+ FSEditLogOp op = stream.readOp();
+ while (op == null) {
+ assertTrue("Expected to find txid " + expected + ", " +
+ "but no more streams available to read from",
+ iter.hasNext());
+ stream = iter.next();
+ op = stream.readOp();
+ }
+
+ assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
+ assertEquals(expected, op.getTransactionId());
+ }
+
+ assertNull(stream.readOp());
+ assertFalse("Expected no more txns after " + lastTxnId +
+ " but more streams are available", iter.hasNext());
+ }
+
+
+
private static void waitForAllPendingCalls(AsyncLoggerSet als)
throws InterruptedException {
for (AsyncLogger l : als.getLoggersForTests()) {