Author: todd
Date: Tue Jun 12 04:15:18 2012
New Revision: 1349114
URL: http://svn.apache.org/viewvc?rev=1349114&view=rev
Log:
HDFS-3049. During the normal NN startup process, fall back on a different edit
log if we see one that is corrupt. Contributed by Colin Patrick McCabe.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1349114&r1=1349113&r2=1349114&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jun 12
04:15:18 2012
@@ -90,6 +90,9 @@ Trunk (unreleased changes)
HDFS-3040. TestMulitipleNNDataBlockScanner is misspelled. (Madhukara Phatak
via atm)
+ HDFS-3049. During the normal NN startup process, fall back on a different
+ edit log if we see one that is corrupt (Colin Patrick McCabe via todd)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1349114&r1=1349113&r2=1349114&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Tue Jun 12 04:15:18 2012
@@ -1173,18 +1173,6 @@ public class FSEditLog {
throw e;
}
}
- // This code will go away as soon as RedundantEditLogInputStream is
- // introduced. (HDFS-3049)
- try {
- if (!streams.isEmpty()) {
- streams.get(0).skipUntil(fromTxId);
- }
- } catch (IOException e) {
- // We don't want to throw an exception from here, because that would make
- // recovery impossible even if the user requested it. An exception will
- // be thrown later, when we don't read the starting txid we expect.
- LOG.error("error skipping until transaction " + fromTxId, e);
- }
return streams;
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1349114&r1=1349113&r2=1349114&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
Tue Jun 12 04:15:18 2012
@@ -668,7 +668,9 @@ public class FSEditLogLoader {
FSImage.LOG.warn("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length." +
"Position was " + lastPos, t);
- break;
+ in.resync();
+ FSImage.LOG.warn("After resync, position is " + in.getPosition());
+ continue;
}
if (lastTxId == HdfsConstants.INVALID_TXID
|| op.getTransactionId() > lastTxId) {
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1349114&r1=1349113&r2=1349114&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
Tue Jun 12 04:15:18 2012
@@ -24,7 +24,9 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
+import java.util.PriorityQueue;
import java.util.SortedSet;
+import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,7 +42,6 @@ import com.google.common.collect.Immutab
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
-import com.google.common.collect.TreeMultiset;
/**
* Manages a collection of Journals. None of the methods are synchronized, it
is
@@ -222,8 +223,9 @@ public class JournalSet implements Journ
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) {
- final TreeMultiset<EditLogInputStream> allStreams =
- TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR);
+ final PriorityQueue<EditLogInputStream> allStreams =
+ new PriorityQueue<EditLogInputStream>(64,
+ EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (JournalAndStream jas : journals) {
if (jas.isDisabled()) {
LOG.info("Skipping jas " + jas + " since it's disabled");
@@ -239,7 +241,8 @@ public class JournalSet implements Journ
// transaction ID.
LinkedList<EditLogInputStream> acc =
new LinkedList<EditLogInputStream>();
- for (EditLogInputStream elis : allStreams) {
+ EditLogInputStream elis;
+ while ((elis = allStreams.poll()) != null) {
if (acc.isEmpty()) {
acc.add(elis);
} else {
@@ -247,7 +250,7 @@ public class JournalSet implements Journ
if (accFirstTxId == elis.getFirstTxId()) {
acc.add(elis);
} else if (accFirstTxId < elis.getFirstTxId()) {
- streams.add(acc.get(0));
+ streams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
acc.add(elis);
} else if (accFirstTxId > elis.getFirstTxId()) {
@@ -258,7 +261,7 @@ public class JournalSet implements Journ
}
}
if (!acc.isEmpty()) {
- streams.add(acc.get(0));
+ streams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
}
}
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java?rev=1349114&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
Tue Jun 12 04:15:18 2012
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+
+/**
+ * A merged input stream that handles failover between different edit logs.
+ *
+ * We will currently try each edit log stream exactly once. In other words, we
+ * don't handle the "ping pong" scenario where different edit logs contain a
+ * different subset of the available edits.
+ */
+class RedundantEditLogInputStream extends EditLogInputStream {
+ public static final Log LOG =
LogFactory.getLog(EditLogInputStream.class.getName());
+ private int curIdx;
+ private long prevTxId;
+ private final EditLogInputStream[] streams;
+
+ /**
+ * States that the RedundantEditLogInputStream can be in.
+ *
+ * <pre>
+ * start (if no streams)
+ * |
+ * V
+ * PrematureEOFException +----------------+
+ * +-------------->| EOF |<--------------+
+ * | +----------------+ |
+ * | |
+ * | start (if there are streams) |
+ * | | |
+ * | V | EOF
+ * | resync +----------------+ skipUntil +---------+
+ * | +---------->| SKIP_UNTIL |----------->| OK |
+ * | | +----------------+ +---------+
+ * | | | IOE ^ fail over to | IOE
+ * | | V | next stream |
+ * +----------------------+ +----------------+ |
+ * | STREAM_FAILED_RESYNC | | STREAM_FAILED |<----------+
+ * +----------------------+ +----------------+
+ * ^ Recovery mode |
+ * +--------------------+
+ * </pre>
+ */
+ static private enum State {
+ /** We need to skip until prevTxId + 1 */
+ SKIP_UNTIL,
+ /** We're ready to read opcodes out of the current stream */
+ OK,
+ /** The current stream has failed. */
+ STREAM_FAILED,
+ /** The current stream has failed, and resync() was called. */
+ STREAM_FAILED_RESYNC,
+ /** There are no more opcodes to read from this
+ * RedundantEditLogInputStream */
+ EOF;
+ }
+
+ private State state;
+ private IOException prevException;
+
+ RedundantEditLogInputStream(Collection<EditLogInputStream> streams,
+ long startTxId) {
+ this.curIdx = 0;
+ this.prevTxId = (startTxId == HdfsConstants.INVALID_TXID) ?
+ HdfsConstants.INVALID_TXID : (startTxId - 1);
+ this.state = (streams.isEmpty()) ? State.EOF : State.SKIP_UNTIL;
+ this.prevException = null;
+ // EditLogInputStreams in a RedundantEditLogInputStream must be finalized,
+ // and can't be pre-transactional.
+ EditLogInputStream first = null;
+ for (EditLogInputStream s : streams) {
+ Preconditions.checkArgument(s.getFirstTxId() !=
+ HdfsConstants.INVALID_TXID, "invalid first txid in stream: %s", s);
+ Preconditions.checkArgument(s.getLastTxId() !=
+ HdfsConstants.INVALID_TXID, "invalid last txid in stream: %s", s);
+ if (first == null) {
+ first = s;
+ } else {
+ Preconditions.checkArgument(s.getFirstTxId() == first.getFirstTxId(),
+ "All streams in the RedundantEditLogInputStream must have the same "
+
+ "start transaction ID! " + first + " had start txId " +
+ first.getFirstTxId() + ", but " + s + " had start txId " +
+ s.getFirstTxId());
+ }
+ }
+
+ this.streams = streams.toArray(new EditLogInputStream[0]);
+
+ // We sort the streams here so that the streams that end later come first.
+ Arrays.sort(this.streams, new Comparator<EditLogInputStream>() {
+ @Override
+ public int compare(EditLogInputStream a, EditLogInputStream b) {
+ return Longs.compare(b.getLastTxId(), a.getLastTxId());
+ }
+ });
+ }
+
+ @Override
+ public String getName() {
+ StringBuilder bld = new StringBuilder();
+ String prefix = "";
+ for (EditLogInputStream elis : streams) {
+ bld.append(prefix);
+ bld.append(elis.getName());
+ prefix = ", ";
+ }
+ return bld.toString();
+ }
+
+ @Override
+ public long getFirstTxId() {
+ return streams[curIdx].getFirstTxId();
+ }
+
+ @Override
+ public long getLastTxId() {
+ return streams[curIdx].getLastTxId();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.cleanup(LOG, streams);
+ }
+
+ @Override
+ protected FSEditLogOp nextValidOp() {
+ try {
+ if (state == State.STREAM_FAILED) {
+ state = State.STREAM_FAILED_RESYNC;
+ }
+ return nextOp();
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ @Override
+ protected FSEditLogOp nextOp() throws IOException {
+ while (true) {
+ switch (state) {
+ case SKIP_UNTIL:
+ try {
+ if (prevTxId != HdfsConstants.INVALID_TXID) {
+ LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
+ "' to transaction ID " + (prevTxId + 1));
+ streams[curIdx].skipUntil(prevTxId + 1);
+ }
+ } catch (IOException e) {
+ prevException = e;
+ state = State.STREAM_FAILED;
+ }
+ state = State.OK;
+ break;
+ case OK:
+ try {
+ FSEditLogOp op = streams[curIdx].readOp();
+ if (op == null) {
+ state = State.EOF;
+ if (streams[curIdx].getLastTxId() == prevTxId) {
+ return null;
+ } else {
+ throw new PrematureEOFException("got premature end-of-file " +
+ "at txid " + prevTxId + "; expected file to go up to " +
+ streams[curIdx].getLastTxId());
+ }
+ }
+ prevTxId = op.getTransactionId();
+ return op;
+ } catch (IOException e) {
+ prevException = e;
+ state = State.STREAM_FAILED;
+ }
+ break;
+ case STREAM_FAILED:
+ if (curIdx + 1 == streams.length) {
+ throw prevException;
+ }
+ long oldLast = streams[curIdx].getLastTxId();
+ long newLast = streams[curIdx + 1].getLastTxId();
+ if (newLast < oldLast) {
+ throw new IOException("We encountered an error reading " +
+ streams[curIdx].getName() + ". During automatic edit log " +
+ "failover, we noticed that all of the remaining edit log " +
+ "streams are shorter than the current one! The best " +
+ "remaining edit log ends at transaction " +
+ newLast + ", but we thought we could read up to transaction " +
+ oldLast + ". If you continue, metadata will be lost forever!");
+ }
+ LOG.error("Got error reading edit log input stream " +
+ streams[curIdx].getName() + "; failing over to edit log " +
+ streams[curIdx + 1].getName(), prevException);
+ curIdx++;
+ state = State.SKIP_UNTIL;
+ break;
+ case STREAM_FAILED_RESYNC:
+ if (curIdx + 1 == streams.length) {
+ if (prevException instanceof PrematureEOFException) {
+ // bypass early EOF check
+ state = State.EOF;
+ } else {
+ streams[curIdx].resync();
+ state = State.SKIP_UNTIL;
+ }
+ } else {
+ LOG.error("failing over to edit log " +
+ streams[curIdx + 1].getName());
+ curIdx++;
+ state = State.SKIP_UNTIL;
+ }
+ break;
+ case EOF:
+ return null;
+ }
+ }
+ }
+
+ @Override
+ public int getVersion() throws IOException {
+ return streams[curIdx].getVersion();
+ }
+
+ @Override
+ public long getPosition() {
+ return streams[curIdx].getPosition();
+ }
+
+ @Override
+ public long length() throws IOException {
+ return streams[curIdx].length();
+ }
+
+ @Override
+ public boolean isInProgress() {
+ return streams[curIdx].isInProgress();
+ }
+
+ static private final class PrematureEOFException extends IOException {
+ private static final long serialVersionUID = 1L;
+ PrematureEOFException(String msg) {
+ super(msg);
+ }
+ }
+}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1349114&r1=1349113&r2=1349114&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
Tue Jun 12 04:15:18 2012
@@ -134,6 +134,7 @@ public class MiniDFSCluster {
private boolean format = true;
private boolean manageNameDfsDirs = true;
private boolean manageNameDfsSharedDirs = true;
+ private boolean enableManagedDfsDirsRedundancy = true;
private boolean manageDataDfsDirs = true;
private StartupOption option = null;
private String[] racks = null;
@@ -187,7 +188,7 @@ public class MiniDFSCluster {
this.manageNameDfsDirs = val;
return this;
}
-
+
/**
* Default: true
*/
@@ -199,6 +200,14 @@ public class MiniDFSCluster {
/**
* Default: true
*/
+ public Builder enableManagedDfsDirsRedundancy(boolean val) {
+ this.enableManagedDfsDirsRedundancy = val;
+ return this;
+ }
+
+ /**
+ * Default: true
+ */
public Builder manageDataDfsDirs(boolean val) {
this.manageDataDfsDirs = val;
return this;
@@ -298,6 +307,7 @@ public class MiniDFSCluster {
builder.format,
builder.manageNameDfsDirs,
builder.manageNameDfsSharedDirs,
+ builder.enableManagedDfsDirsRedundancy,
builder.manageDataDfsDirs,
builder.option,
builder.racks,
@@ -385,7 +395,7 @@ public class MiniDFSCluster {
public MiniDFSCluster(Configuration conf,
int numDataNodes,
StartupOption nameNodeOperation) throws IOException {
- this(0, conf, numDataNodes, false, false, false, nameNodeOperation,
+ this(0, conf, numDataNodes, false, false, false, false,
nameNodeOperation,
null, null, null);
}
@@ -407,7 +417,8 @@ public class MiniDFSCluster {
int numDataNodes,
boolean format,
String[] racks) throws IOException {
- this(0, conf, numDataNodes, format, true, true, null, racks, null, null);
+ this(0, conf, numDataNodes, format, true, true, true, null,
+ racks, null, null);
}
/**
@@ -429,7 +440,8 @@ public class MiniDFSCluster {
int numDataNodes,
boolean format,
String[] racks, String[] hosts) throws IOException {
- this(0, conf, numDataNodes, format, true, true, null, racks, hosts, null);
+ this(0, conf, numDataNodes, format, true, true, true, null,
+ racks, hosts, null);
}
/**
@@ -462,8 +474,8 @@ public class MiniDFSCluster {
boolean manageDfsDirs,
StartupOption operation,
String[] racks) throws IOException {
- this(nameNodePort, conf, numDataNodes, format, manageDfsDirs,
manageDfsDirs,
- operation, racks, null, null);
+ this(nameNodePort, conf, numDataNodes, format, manageDfsDirs,
+ manageDfsDirs, manageDfsDirs, operation, racks, null, null);
}
/**
@@ -497,7 +509,7 @@ public class MiniDFSCluster {
String[] racks,
long[] simulatedCapacities) throws IOException {
this(nameNodePort, conf, numDataNodes, format, manageDfsDirs,
manageDfsDirs,
- operation, racks, null, simulatedCapacities);
+ manageDfsDirs, operation, racks, null, simulatedCapacities);
}
/**
@@ -531,13 +543,15 @@ public class MiniDFSCluster {
int numDataNodes,
boolean format,
boolean manageNameDfsDirs,
+ boolean enableManagedDfsDirsRedundancy,
boolean manageDataDfsDirs,
StartupOption operation,
String[] racks, String hosts[],
long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
initMiniDFSCluster(conf, numDataNodes, format,
- manageNameDfsDirs, true, manageDataDfsDirs, operation, racks, hosts,
+ manageNameDfsDirs, true, enableManagedDfsDirsRedundancy,
manageDataDfsDirs,
+ operation, racks, hosts,
simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0));
}
@@ -545,8 +559,8 @@ public class MiniDFSCluster {
private void initMiniDFSCluster(
Configuration conf,
int numDataNodes, boolean format, boolean manageNameDfsDirs,
- boolean manageNameDfsSharedDirs, boolean manageDataDfsDirs,
- StartupOption operation, String[] racks,
+ boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
+ boolean manageDataDfsDirs, StartupOption operation, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology)
@@ -586,6 +600,7 @@ public class MiniDFSCluster {
federation = nnTopology.isFederated();
createNameNodesAndSetConf(
nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
+ enableManagedDfsDirsRedundancy,
format, operation, clusterId, conf);
if (format) {
@@ -608,7 +623,8 @@ public class MiniDFSCluster {
private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs,
- boolean format, StartupOption operation, String clusterId,
+ boolean enableManagedDfsDirsRedundancy, boolean format,
+ StartupOption operation, String clusterId,
Configuration conf) throws IOException {
Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
"empty NN topology: no namenodes specified!");
@@ -664,7 +680,7 @@ public class MiniDFSCluster {
Collection<URI> prevNNDirs = null;
int nnCounterForFormat = nnCounter;
for (NNConf nn : nameservice.getNNs()) {
- initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs,
+ initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs,
manageNameDfsDirs,
nnCounterForFormat);
Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
if (format) {
@@ -696,7 +712,8 @@ public class MiniDFSCluster {
// Start all Namenodes
for (NNConf nn : nameservice.getNNs()) {
- initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs,
nnCounter);
+ initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs,
+ enableManagedDfsDirsRedundancy, nnCounter);
createNameNode(nnCounter++, conf, numDataNodes, false, operation,
clusterId, nsId, nn.getNnId());
}
@@ -721,8 +738,8 @@ public class MiniDFSCluster {
private void initNameNodeConf(Configuration conf,
String nameserviceId, String nnId,
- boolean manageNameDfsDirs, int nnIndex)
- throws IOException {
+ boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy,
+ int nnIndex) throws IOException {
if (nameserviceId != null) {
conf.set(DFS_NAMESERVICE_ID, nameserviceId);
}
@@ -731,12 +748,21 @@ public class MiniDFSCluster {
}
if (manageNameDfsDirs) {
- conf.set(DFS_NAMENODE_NAME_DIR_KEY,
- fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
- fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
- conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
- fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
- fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
+ if (enableManagedDfsDirsRedundancy) {
+ conf.set(DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
+ fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
+ conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+ fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex +
1)))+","+
+ fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
+ } else {
+ conf.set(DFS_NAMENODE_NAME_DIR_KEY,
+ fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1))).
+ toString());
+ conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+ fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1))).
+ toString());
+ }
}
}
@@ -2134,7 +2160,7 @@ public class MiniDFSCluster {
String nnId = null;
initNameNodeAddress(conf, nameserviceId,
new NNConf(nnId).setIpcPort(namenodePort));
- initNameNodeConf(conf, nameserviceId, nnId, true, nnIndex);
+ initNameNodeConf(conf, nameserviceId, nnId, true, true, nnIndex);
createNameNode(nnIndex, conf, numDataNodes, true, null, null,
nameserviceId, nnId);
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1349114&r1=1349113&r2=1349114&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Tue Jun 12 04:15:18 2012
@@ -506,21 +506,29 @@ public class TestEditLog extends TestCas
FSImage fsimage = namesystem.getFSImage();
final FSEditLog editLog = fsimage.getEditLog();
fileSys.mkdirs(new Path("/tmp"));
- StorageDirectory sd =
fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
+
+ Iterator<StorageDirectory> iter = fsimage.getStorage().
+ dirIterator(NameNodeDirType.EDITS);
+ LinkedList<StorageDirectory> sds = new LinkedList<StorageDirectory>();
+ while (iter.hasNext()) {
+ sds.add(iter.next());
+ }
editLog.close();
cluster.shutdown();
- File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3);
- assertTrue(editFile.exists());
-
- long fileLen = editFile.length();
- System.out.println("File name: " + editFile + " len: " + fileLen);
- RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
- rwf.seek(fileLen-4); // seek to checksum bytes
- int b = rwf.readInt();
- rwf.seek(fileLen-4);
- rwf.writeInt(b+1);
- rwf.close();
+ for (StorageDirectory sd : sds) {
+ File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3);
+ assertTrue(editFile.exists());
+
+ long fileLen = editFile.length();
+ LOG.debug("Corrupting Log File: " + editFile + " len: " + fileLen);
+ RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
+ rwf.seek(fileLen-4); // seek to checksum bytes
+ int b = rwf.readInt();
+ rwf.seek(fileLen-4);
+ rwf.writeInt(b+1);
+ rwf.close();
+ }
try {
cluster = new
MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).format(false).build();
@@ -1232,6 +1240,113 @@ public class TestEditLog extends TestCas
}
}
+ private static long readAllEdits(Collection<EditLogInputStream> streams,
+ long startTxId) throws IOException {
+ FSEditLogOp op;
+ long nextTxId = startTxId;
+ long numTx = 0;
+ for (EditLogInputStream s : streams) {
+ while (true) {
+ op = s.readOp();
+ if (op == null)
+ break;
+ if (op.getTransactionId() != nextTxId) {
+ throw new IOException("out of order transaction ID! expected " +
+ nextTxId + " but got " + op.getTransactionId() + " when " +
+ "reading " + s.getName());
+ }
+ numTx++;
+ nextTxId = op.getTransactionId() + 1;
+ }
+ }
+ return numTx;
+ }
+
+ /**
+ * Test edit log failover. If a single edit log is missing, other
+ * edits logs should be used instead.
+ */
+ @Test
+ public void testEditLogFailOverFromMissing() throws IOException {
+ File f1 = new File(TEST_DIR + "/failover0");
+ File f2 = new File(TEST_DIR + "/failover1");
+ List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI());
+
+ NNStorage storage = setupEdits(editUris, 3);
+
+ final long startErrorTxId = 1*TXNS_PER_ROLL + 1;
+ final long endErrorTxId = 2*TXNS_PER_ROLL;
+
+ File[] files = new File(f1, "current").listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ if
(name.startsWith(NNStorage.getFinalizedEditsFileName(startErrorTxId,
+ endErrorTxId))) {
+ return true;
+ }
+ return false;
+ }
+ });
+ assertEquals(1, files.length);
+ assertTrue(files[0].delete());
+
+ FSEditLog editlog = getFSEditLog(storage);
+ editlog.initJournalsForWrite();
+ long startTxId = 1;
+ try {
+ readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL),
+ startTxId);
+ } catch (IOException e) {
+ LOG.error("edit log failover didn't work", e);
+ fail("Edit log failover didn't work");
+ }
+ }
+
+ /**
+ * Test edit log failover from a corrupt edit log
+ */
+ @Test
+ public void testEditLogFailOverFromCorrupt() throws IOException {
+ File f1 = new File(TEST_DIR + "/failover0");
+ File f2 = new File(TEST_DIR + "/failover1");
+ List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI());
+
+ NNStorage storage = setupEdits(editUris, 3);
+
+ final long startErrorTxId = 1*TXNS_PER_ROLL + 1;
+ final long endErrorTxId = 2*TXNS_PER_ROLL;
+
+ File[] files = new File(f1, "current").listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ if
(name.startsWith(NNStorage.getFinalizedEditsFileName(startErrorTxId,
+ endErrorTxId))) {
+ return true;
+ }
+ return false;
+ }
+ });
+ assertEquals(1, files.length);
+
+ long fileLen = files[0].length();
+ LOG.debug("Corrupting Log File: " + files[0] + " len: " + fileLen);
+ RandomAccessFile rwf = new RandomAccessFile(files[0], "rw");
+ rwf.seek(fileLen-4); // seek to checksum bytes
+ int b = rwf.readInt();
+ rwf.seek(fileLen-4);
+ rwf.writeInt(b+1);
+ rwf.close();
+
+ FSEditLog editlog = getFSEditLog(storage);
+ editlog.initJournalsForWrite();
+ long startTxId = 1;
+ try {
+ readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL),
+ startTxId);
+ } catch (IOException e) {
+ LOG.error("edit log failover didn't work", e);
+ fail("Edit log failover didn't work");
+ }
+ }
+
/**
* Test creating a directory with lots and lots of edit log segments
*/
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java?rev=1349114&r1=1349113&r2=1349114&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
Tue Jun 12 04:15:18 2012
@@ -51,6 +51,16 @@ public class TestEditLogFileOutputStream
}
@Test
+ public void testConstants() {
+ // Each call to FSEditLogOp#Reader#readOp can read at most MAX_OP_SIZE
bytes
+ // before getting an exception. So we don't want to preallocate a longer
+ // region than MAX_OP_SIZE, because then we'd get an IOException when
reading
+ // through the padding at the end of the file.
+ assertTrue(EditLogFileOutputStream.PREALLOCATION_LENGTH <
+ FSEditLogOp.MAX_OP_SIZE);
+ }
+
+ @Test
public void testPreallocation() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1349114&r1=1349113&r2=1349114&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
Tue Jun 12 04:15:18 2012
@@ -77,7 +77,7 @@ public class TestFSEditLogLoader {
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
- .build();
+ .enableManagedDfsDirsRedundancy(false).build();
cluster.waitActive();
fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
@@ -107,7 +107,7 @@ public class TestFSEditLogLoader {
bld.append("Recent opcode offsets: (\\d+\\s*){4}$");
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
- .format(false).build();
+ .enableManagedDfsDirsRedundancy(false).format(false).build();
fail("should not be able to start");
} catch (IOException e) {
assertTrue("error message contains opcodes message",
@@ -327,6 +327,56 @@ public class TestFSEditLogLoader {
}
@Test
+ public void testValidateEditLogWithCorruptBody() throws IOException {
+ File testDir = new File(TEST_DIR, "testValidateEditLogWithCorruptBody");
+ SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
+ final int NUM_TXNS = 20;
+ File logFile = prepareUnfinalizedTestEditLog(testDir, NUM_TXNS,
+ offsetToTxId);
+ // Back up the uncorrupted log
+ File logFileBak = new File(testDir, logFile.getName() + ".bak");
+ Files.copy(logFile, logFileBak);
+ EditLogValidation validation =
+ EditLogFileInputStream.validateEditLog(logFile);
+ assertTrue(!validation.hasCorruptHeader());
+ // We expect that there will be an OP_START_LOG_SEGMENT, followed by
+ // NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT.
+ assertEquals(NUM_TXNS + 1, validation.getEndTxId());
+ // Corrupt each edit and verify that validation continues to work
+ for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
+ long txOffset = entry.getKey();
+ long txId = entry.getValue();
+
+ // Restore backup, corrupt the txn opcode
+ Files.copy(logFileBak, logFile);
+ corruptByteInFile(logFile, txOffset);
+ validation = EditLogFileInputStream.validateEditLog(logFile);
+ long expectedEndTxId = (txId == (NUM_TXNS + 1)) ?
+ NUM_TXNS : (NUM_TXNS + 1);
+ assertEquals("Failed when corrupting txn opcode at " + txOffset,
+ expectedEndTxId, validation.getEndTxId());
+ assertTrue(!validation.hasCorruptHeader());
+ }
+
+ // Truncate right before each edit and verify that validation continues
+ // to work
+ for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
+ long txOffset = entry.getKey();
+ long txId = entry.getValue();
+
+ // Restore backup, corrupt the txn opcode
+ Files.copy(logFileBak, logFile);
+ truncateFile(logFile, txOffset);
+ validation = EditLogFileInputStream.validateEditLog(logFile);
+ long expectedEndTxId = (txId == 0) ?
+ HdfsConstants.INVALID_TXID : (txId - 1);
+ assertEquals("Failed when corrupting txid " + txId + " txn opcode " +
+ "at " + txOffset, expectedEndTxId, validation.getEndTxId());
+ assertTrue(!validation.hasCorruptHeader());
+ }
+ }
+
+ @Test
public void testValidateEmptyEditLog() throws IOException {
File testDir = new File(TEST_DIR, "testValidateEmptyEditLog");
SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java?rev=1349114&r1=1349113&r2=1349114&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
Tue Jun 12 04:15:18 2012
@@ -20,10 +20,10 @@ package org.apache.hadoop.hdfs.server.na
import static org.junit.Assert.*;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Iterator;
+import java.util.PriorityQueue;
import java.io.RandomAccessFile;
import java.io.File;
@@ -33,7 +33,6 @@ import org.junit.Test;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import
org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@@ -45,7 +44,6 @@ import static org.apache.hadoop.hdfs.ser
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.TreeMultiset;
import com.google.common.base.Joiner;
public class TestFileJournalManager {
@@ -64,12 +62,13 @@ public class TestFileJournalManager {
static long getNumberOfTransactions(FileJournalManager jm, long fromTxId,
boolean inProgressOk, boolean abortOnGap) throws IOException {
long numTransactions = 0, txId = fromTxId;
- final TreeMultiset<EditLogInputStream> allStreams =
- TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+ final PriorityQueue<EditLogInputStream> allStreams =
+ new PriorityQueue<EditLogInputStream>(64,
+ JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, fromTxId, inProgressOk);
-
+ EditLogInputStream elis = null;
try {
- for (EditLogInputStream elis : allStreams) {
+ while ((elis = allStreams.poll()) != null) {
elis.skipUntil(txId);
while (true) {
FSEditLogOp op = elis.readOp();
@@ -87,6 +86,7 @@ public class TestFileJournalManager {
}
} finally {
IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
+ IOUtils.cleanup(LOG, elis);
}
return numTransactions;
}
@@ -379,27 +379,28 @@ public class TestFileJournalManager {
private static EditLogInputStream getJournalInputStream(JournalManager jm,
long txId, boolean inProgressOk) throws IOException {
- final TreeMultiset<EditLogInputStream> allStreams =
- TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+ final PriorityQueue<EditLogInputStream> allStreams =
+ new PriorityQueue<EditLogInputStream>(64,
+ JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, txId, inProgressOk);
+ EditLogInputStream elis = null, ret;
try {
- for (Iterator<EditLogInputStream> iter = allStreams.iterator();
- iter.hasNext();) {
- EditLogInputStream elis = iter.next();
+ while ((elis = allStreams.poll()) != null) {
if (elis.getFirstTxId() > txId) {
break;
}
if (elis.getLastTxId() < txId) {
- iter.remove();
elis.close();
continue;
}
elis.skipUntil(txId);
- iter.remove();
- return elis;
+ ret = elis;
+ elis = null;
+ return ret;
}
} finally {
IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
+ IOUtils.cleanup(LOG, elis);
}
return null;
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java?rev=1349114&r1=1349113&r2=1349114&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
Tue Jun 12 04:15:18 2012
@@ -343,7 +343,7 @@ public class TestNameNodeRecovery {
StorageDirectory sd = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
- .build();
+ .enableManagedDfsDirsRedundancy(false).build();
cluster.waitActive();
if (!finalize) {
// Normally, the in-progress edit log would be finalized by
@@ -379,7 +379,7 @@ public class TestNameNodeRecovery {
try {
LOG.debug("trying to start normally (this should fail)...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
- .format(false).build();
+ .enableManagedDfsDirsRedundancy(false).format(false).build();
cluster.waitActive();
cluster.shutdown();
if (needRecovery) {
@@ -404,7 +404,8 @@ public class TestNameNodeRecovery {
try {
LOG.debug("running recovery...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
- .format(false).startupOption(recoverStartOpt).build();
+ .enableManagedDfsDirsRedundancy(false).format(false)
+ .startupOption(recoverStartOpt).build();
} catch (IOException e) {
fail("caught IOException while trying to recover. " +
"message was " + e.getMessage() +
@@ -420,7 +421,7 @@ public class TestNameNodeRecovery {
try {
LOG.debug("starting cluster normally after recovery...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
- .format(false).build();
+ .enableManagedDfsDirsRedundancy(false).format(false).build();
LOG.debug("successfully recovered the " + corruptor.getName() +
" corrupted edit log");
cluster.waitActive();