Author: shv
Date: Thu Apr 2 21:53:49 2009
New Revision: 761439
URL: http://svn.apache.org/viewvc?rev=761439&view=rev
Log:
HADOOP-4045. Fix processing of IO errors in EditsLog. Contributed by Boris
Shkolnik.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=761439&r1=761438&r2=761439&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr 2 21:53:49 2009
@@ -356,6 +356,9 @@
HADOOP-2413. Remove the static variable FSNamesystem.fsNamesystemObject.
(Konstantin Shvachko via szetszwo)
+ HADOOP-4045. Fix processing of IO errors in EditsLog.
+ (Boris Shkolnik via shv)
+
Release 0.20.0 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=761439&r1=761438&r2=761439&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Thu Apr 2 21:53:49 2009
@@ -26,12 +26,14 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Iterator;
-import java.lang.Math;
-import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -44,9 +46,14 @@
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.mortbay.log.Log;
/**
* FSEditLog maintains a log of the namespace modifications.
@@ -360,20 +367,30 @@
numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
if (editStreams == null)
editStreams = new ArrayList<EditLogOutputStream>();
+
+ ArrayList<StorageDirectory> al = null;
for (Iterator<StorageDirectory> it =
fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
StorageDirectory sd = it.next();
File eFile = getEditFile(sd);
try {
- EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
- editStreams.add(eStream);
+ addNewEditLogStream(eFile);
} catch (IOException e) {
FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
// Remove the directory from list of storage directories
- fsimage.removedStorageDirs.add(sd);
- it.remove();
+ if(al == null) al = new ArrayList<StorageDirectory>(1);
+ al.add(sd);
+
}
}
+
+ if(al != null) fsimage.processIOError(al, false);
+ }
+
+
+ public synchronized void addNewEditLogStream(File eFile) throws IOException {
+ EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
+ editStreams.add(eStream);
}
public synchronized void createEditLogFile(File name) throws IOException {
@@ -424,7 +441,7 @@
errorStreams.add(eStream);
}
}
- processIOError(errorStreams);
+ processIOError(errorStreams, true);
editStreams.clear();
}
@@ -441,80 +458,101 @@
}
/**
- * If there is an IO Error on any log operations, remove that
- * directory from the list of directories.
- * If no more directories remain, then exit.
- */
- synchronized void processIOError(int index) {
+ * The specified streams have IO errors. Close and remove them.
+ * If propagate is true - close related StorageDirectories.
+ * (is called with propagate value true from everywhere
+ * except fsimage.processIOError)
+ */
+ synchronized void processIOError(
+ ArrayList<EditLogOutputStream> errorStreams,
+ boolean propagate) {
+
+ String lsd = fsimage.listStorageDirectories();
+ FSNamesystem.LOG.info("current list of storage dirs:" + lsd);
+
+ if (errorStreams == null || errorStreams.size() == 0) {
+ return; // nothing to do
+ }
+
+ //EditLogOutputStream
if (editStreams == null || editStreams.size() <= 1) {
FSNamesystem.LOG.fatal(
"Fatal Error : All storage directories are inaccessible.");
Runtime.getRuntime().exit(-1);
}
- assert(index < getNumEditStreams());
- EditLogOutputStream eStream = editStreams.get(index);
- removeStream(index);
+ ArrayList<StorageDirectory> al = null;
+ for (EditLogOutputStream eStream : errorStreams) {
+ FSNamesystem.LOG.error("Unable to log edits to " + eStream.getName()
+ + "; removing it");
+
+ StorageDirectory storageDir;
+ if(propagate && eStream.getType() == JournalType.FILE && //find SD
+ (storageDir = getStorage(eStream)) != null) {
+ FSNamesystem.LOG.info("about to remove corresponding storage:"
+ + storageDir.getRoot().getAbsolutePath());
+ // remove corresponding storage dir
+ if(al == null) al = new ArrayList<StorageDirectory>(1);
+ al.add(storageDir);
+ }
+ Iterator<EditLogOutputStream> ies = editStreams.iterator();
+ while (ies.hasNext()) {
+ EditLogOutputStream es = ies.next();
+ if (es == eStream) {
+ try { eStream.close(); } catch (IOException e) {
+ // nothing to do.
+ FSNamesystem.LOG.warn("Failed to close eStream " +
eStream.getName()
+ + " before removing it (might be ok)");
+ }
+ ies.remove();
+ break;
+ }
+ }
+ }
+
+ // removed failed SDs
+ if(propagate && al != null) fsimage.processIOError(al, false);
+
+ //for the rest of the streams
+ if(propagate) incrementCheckpointTime();
+
+ lsd = fsimage.listStorageDirectories();
+ FSNamesystem.LOG.info("at the end current list of storage dirs:" + lsd);
+ }
- if(!(eStream instanceof EditLogFileOutputStream))
- return; // non file streams don't have associated storage directories
- EditLogFileOutputStream eFStream = (EditLogFileOutputStream)eStream;
- File parentStorageDir = eFStream.getFile().getParentFile().getParentFile();
- //
- // Invoke the ioerror routine of the fsimage
- //
- fsimage.processIOError(parentStorageDir);
- }
-
/**
- * If there is an IO Error on any log operations on storage directory,
- * remove any stream associated with that directory
+ * get an editStream corresponding to a sd
+ * @param es - stream to remove
+ * @return the matching stream
*/
- synchronized void processIOError(StorageDirectory sd) {
- // Try to remove stream only if one should exist
- if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
- return;
- if (editStreams == null || editStreams.size() <= 1) {
- FSNamesystem.LOG.fatal(
- "Fatal Error : All storage directories are inaccessible.");
- Runtime.getRuntime().exit(-1);
- }
- for (int idx = 0; idx < editStreams.size(); idx++) {
- File parentStorageDir = ((EditLogFileOutputStream)editStreams
- .get(idx)).getFile()
- .getParentFile().getParentFile();
- if (parentStorageDir.getName().equals(sd.getRoot().getName()))
- removeStream(idx);
+ public StorageDirectory getStorage(EditLogOutputStream es) {
+ String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
+ .getParentFile().getParentFile().getAbsolutePath();
+
+ Iterator<StorageDirectory> it = fsimage.dirIterator();
+ while (it.hasNext()) {
+ StorageDirectory sd = it.next();
+ FSNamesystem.LOG.info("comparing: " + parentStorageDir + " and " +
sd.getRoot().getAbsolutePath());
+ if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
+ return sd;
}
+ return null;
}
/**
- * The specified streams have IO errors. Remove them from logging
- * new transactions.
- */
- synchronized void processIOError(ArrayList<EditLogOutputStream>
errorStreams) {
- if (errorStreams == null) {
- return; // nothing to do
- }
- for (int idx = 0; idx < errorStreams.size(); idx++) {
- EditLogOutputStream eStream = errorStreams.get(idx);
- int j = 0;
- int numEditStreams = editStreams.size();
- for (j = 0; j < numEditStreams; j++) {
- if (editStreams.get(j) == eStream) {
- FSNamesystem.LOG.error("Unable to log edits to " +
eStream.getName());
- break;
- }
- }
- if (j == numEditStreams) {
- FSNamesystem.LOG.error("Unable to find sync log on which " +
- " IO error occured.");
- continue;
- }
- processIOError(j);
- }
- incrementCheckpointTime();
+ * get an editStream corresponding to a sd
+ * @param sd
+ * @return the matching stream
+ */
+ public EditLogOutputStream getEditsStream(StorageDirectory sd) {
+ for (EditLogOutputStream es : editStreams) {
+ File parentStorageDir = ((EditLogFileOutputStream)es).getFile()
+ .getParentFile().getParentFile();
+ if (parentStorageDir.getName().equals(sd.getRoot().getName()))
+ return es;
+ }
+ return null;
}
/**
@@ -905,6 +943,7 @@
ArrayList<EditLogOutputStream> errorStreams = null;
long start = FSNamesystem.now();
for(EditLogOutputStream eStream : editStreams) {
+ Log.debug("loggin edits into " + eStream.getName() + " stream");
if(!eStream.isOperationSupported(op))
continue;
try {
@@ -916,7 +955,7 @@
errorStreams.add(eStream);
}
}
- processIOError(errorStreams);
+ processIOError(errorStreams, true);
recordTransaction(start);
}
@@ -1001,7 +1040,7 @@
long elapsed = FSNamesystem.now() - start;
synchronized (this) {
- processIOError(errorStreams);
+ processIOError(errorStreams, true);
synctxid = syncStart;
isSyncRunning = false;
this.notifyAll();
@@ -1178,6 +1217,7 @@
assert getNumEditsDirs() <= getNumEditStreams() :
"Number of edits directories should not exceed the number of streams.";
long size = 0;
+ ArrayList<EditLogOutputStream> al = null;
for (int idx = 0; idx < getNumEditStreams(); idx++) {
EditLogOutputStream es = editStreams.get(idx);
try {
@@ -1188,11 +1228,21 @@
} catch (IOException e) {
FSImage.LOG.warn("getEditLogSize: editstream.length failed. removing
editlog (" +
idx + ") " + es.getName());
- processIOError(idx);
+ if(al==null) al = new ArrayList<EditLogOutputStream>(1);
+ al.add(es);
}
}
+ if(al!=null) processIOError(al, true);
return size;
}
+
+ public String listEditsStreams() {
+ StringBuffer buf = new StringBuffer();
+ for (EditLogOutputStream os : editStreams) {
+ buf.append(os.getName() + ";");
+ }
+ return buf.toString();
+ }
/**
* Closes the current edit log and opens edits.new.
@@ -1256,7 +1306,7 @@
errorStreams.add(eStream);
}
}
- processIOError(errorStreams);
+ processIOError(errorStreams, true);
}
/**
@@ -1316,18 +1366,21 @@
errorStreams.add(eStream);
}
}
- processIOError(errorStreams);
+ processIOError(errorStreams, true);
}
/**
* Return the name of the edit file
*/
synchronized File getFsEditName() {
- StorageDirectory sd = null;
+ StorageDirectory sd = null;
for (Iterator<StorageDirectory> it =
- fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();)
- sd = it.next();
- return getEditFile(sd);
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+ sd = it.next();
+ if(sd.getRoot().canRead())
+ return getEditFile(sd);
+ }
+ return null;
}
/**
@@ -1463,7 +1516,7 @@
errorStreams.add(eStream);
}
}
- processIOError(errorStreams);
+ processIOError(errorStreams, true);
recordTransaction(start);
}
@@ -1552,7 +1605,7 @@
}
assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
"Not a backup node corresponds to a backup stream";
- processIOError(errorStreams);
+ processIOError(errorStreams, true);
}
synchronized boolean checkBackupRegistration(
@@ -1579,7 +1632,7 @@
}
assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
"Not a backup node corresponds to a backup stream";
- processIOError(errorStreams);
+ processIOError(errorStreams, true);
return regAllowed;
}
}
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=761439&r1=761438&r2=761439&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
Thu Apr 2 21:53:49 2009
@@ -718,34 +718,48 @@
} catch(IOException e) {
// Close any edits stream associated with this dir and remove directory
LOG.warn("incrementCheckpointTime failed on " + sd.getRoot().getPath()
+ ";type="+sd.getStorageDirType());
- if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
- editLog.processIOError(sd);
-
- //add storage to the removed list
- removedStorageDirs.add(sd);
- it.remove();
}
}
}
-
+
/**
- * Remove storage directory given directory
- */
-
- void processIOError(File dirName) {
- for (Iterator<StorageDirectory> it =
- dirIterator(); it.hasNext();) {
- StorageDirectory sd = it.next();
- if (sd.getRoot().getPath().equals(dirName.getPath())) {
- //add storage to the removed list
- LOG.warn("FSImage:processIOError: removing storage: " +
dirName.getPath());
- try {
- sd.unlock(); //try to unlock before removing (in case it is restored)
- } catch (Exception e) {}
- removedStorageDirs.add(sd);
- it.remove();
+ * @param sds - array of SDs to process
+ * @param propagate - flag, if set - then call corresponding EditLog
stream's
+ * processIOError function.
+ */
+ void processIOError(ArrayList<StorageDirectory> sds, boolean propagate) {
+ ArrayList<EditLogOutputStream> al = null;
+ for(StorageDirectory sd:sds) {
+ // if has a stream assosiated with it - remove it too..
+ if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
{
+ EditLogOutputStream eStream = editLog.getEditsStream(sd);
+ if(al == null) al = new ArrayList<EditLogOutputStream>(1);
+ al.add(eStream);
+ }
+
+ for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+ StorageDirectory sd1 = it.next();
+ if (sd.equals(sd1)) {
+ //add storage to the removed list
+ LOG.warn("FSImage:processIOError: removing storage: "
+ + sd.getRoot().getPath());
+ try {
+ sd1.unlock(); //unlock before removing (in case it will be
restored)
+ } catch (Exception e) {
+ // nothing
+ }
+ removedStorageDirs.add(sd1);
+ it.remove();
+ break;
+ }
}
}
+ // if there are some edit log streams to remove
+ if(propagate && al != null)
+ editLog.processIOError(al, false);
+
+ //if called from edits log, the it will call increment from there
+ if(propagate) incrementCheckpointTime();
}
public FSEditLog getEditLog() {
@@ -1421,8 +1435,9 @@
* Renames new image
*/
void renameCheckpoint() {
+ ArrayList<StorageDirectory> al = null;
for (Iterator<StorageDirectory> it =
- dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+ dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
StorageDirectory sd = it.next();
File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
File curFile = getImageFile(sd, NameNodeFile.IMAGE);
@@ -1433,17 +1448,13 @@
if (!curFile.delete() || !ckpt.renameTo(curFile)) {
LOG.warn("renaming " + ckpt.getAbsolutePath() + " to " +
curFile.getAbsolutePath() + " FAILED");
-
- // Close edit stream, if this directory is also used for edits
- if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
- editLog.processIOError(sd);
-
- // add storage to the removed list
- removedStorageDirs.add(sd);
- it.remove();
+
+ if(al == null) al = new ArrayList<StorageDirectory> (1);
+ al.add(sd);
}
}
}
+ if(al != null) processIOError(al, true);
}
/**
@@ -1453,6 +1464,8 @@
this.layoutVersion = FSConstants.LAYOUT_VERSION;
if(renewCheckpointTime)
this.checkpointTime = FSNamesystem.now();
+
+ ArrayList<StorageDirectory> al = null;
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
@@ -1474,14 +1487,12 @@
sd.write();
} catch (IOException e) {
LOG.error("Cannot write file " + sd.getRoot(), e);
- // Close edit stream, if this directory is also used for edits
- if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
- editLog.processIOError(sd);
- //add storage to the removed list
- removedStorageDirs.add(sd);
- it.remove();
+
+ if(al == null) al = new ArrayList<StorageDirectory> (1);
+ al.add(sd);
}
}
+ if(al != null) processIOError(al, true);
ckptState = FSImage.CheckpointStates.START;
}
@@ -1625,18 +1636,21 @@
* Return the name of the image file.
*/
File getFsImageName() {
- StorageDirectory sd = null;
- for (Iterator<StorageDirectory> it =
- dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
- sd = it.next();
- return getImageFile(sd, NameNodeFile.IMAGE);
+ StorageDirectory sd = null;
+ for (Iterator<StorageDirectory> it =
+ dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+ sd = it.next();
+ if(sd.getRoot().canRead())
+ return getImageFile(sd, NameNodeFile.IMAGE);
+ }
+ return null;
}
/**
* See if any of removed storages iw "writable" again, and can be returned
* into service
*/
- void attemptRestoreRemovedStorage() {
+ synchronized void attemptRestoreRemovedStorage() {
// if directory is "alive" - copy the images there...
if(!restoreFailedStorage || removedStorageDirs.size() == 0)
return; //nothing to restore
@@ -1653,6 +1667,10 @@
if(root.exists() && root.canWrite()) {
format(sd);
LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
+ if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+ File eFile = getEditFile(sd);
+ editLog.addNewEditLogStream(eFile);
+ }
this.addStorageDir(sd); // restore
it.remove();
}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java?rev=761439&r1=761438&r2=761439&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
Thu Apr 2 21:53:49 2009
@@ -18,8 +18,14 @@
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
@@ -38,7 +44,6 @@
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
-import org.apache.hadoop.util.StringUtils;
/**
@@ -80,14 +85,14 @@
throw new IOException("Could not delete hdfs directory '" + hdfsDir +
"'");
}
- hdfsDir.mkdir();
+ hdfsDir.mkdirs();
path1 = new File(hdfsDir, "name1");
path2 = new File(hdfsDir, "name2");
path3 = new File(hdfsDir, "name3");
path1.mkdir(); path2.mkdir(); path3.mkdir();
if(!path2.exists() || !path3.exists() || !path1.exists()) {
- throw new IOException("Couldn't create dfs.name dirs");
+ throw new IOException("Couldn't create dfs.name dirs in " +
hdfsDir.getAbsolutePath());
}
String dfs_name_dir = new String(path1.getPath() + "," + path2.getPath());
@@ -117,11 +122,20 @@
}
/**
- * invalidate storage by removing current directories
+ * invalidate storage by removing storage directories
*/
public void invalidateStorage(FSImage fi) throws IOException {
- fi.getEditLog().processIOError(2); //name3
- fi.getEditLog().processIOError(1); // name2
+ ArrayList<StorageDirectory> al = new ArrayList<StorageDirectory>(2);
+ Iterator<StorageDirectory> it = fi.dirIterator();
+ while(it.hasNext()) {
+ StorageDirectory sd = it.next();
+ if(sd.getRoot().getAbsolutePath().equals(path2.getAbsolutePath()) ||
+ sd.getRoot().getAbsolutePath().equals(path3.getAbsolutePath())) {
+ al.add(sd);
+ }
+ }
+ // simulate an error
+ fi.processIOError(al, true);
}
/**
@@ -143,10 +157,57 @@
}
}
+
+ /**
+ * This function returns a md5 hash of a file.
+ *
+ * @param FileToMd5
+ * @return The md5 string
+ */
+ public String getFileMD5(File file) throws Exception {
+ String res = new String();
+ MessageDigest mD = MessageDigest.getInstance("MD5");
+ DataInputStream dis = new DataInputStream(new FileInputStream(file));
+
+ try {
+ while(true) {
+ mD.update(dis.readByte());
+ }
+ } catch (EOFException eof) {}
+
+ BigInteger bigInt = new BigInteger(1, mD.digest());
+ res = bigInt.toString(16);
+ dis.close();
+
+ return res;
+ }
+
+
+ /**
+ * read currentCheckpointTime directly from the file
+ * @param currDir
+ * @return
+ * @throws IOException
+ */
+ long readCheckpointTime(File currDir) throws IOException {
+ File timeFile = new File(currDir, NameNodeFile.TIME.getName());
+ long timeStamp = 0L;
+ if (timeFile.exists() && timeFile.canRead()) {
+ DataInputStream in = new DataInputStream(new FileInputStream(timeFile));
+ try {
+ timeStamp = in.readLong();
+ } finally {
+ in.close();
+ }
+ }
+ return timeStamp;
+ }
+
/**
* check if files exist/not exist
+ * @throws IOException
*/
- public void checkFiles(boolean valid) {
+ public void checkFiles(boolean valid) throws IOException {
//look at the valid storage
File fsImg1 = new File(path1, Storage.STORAGE_DIR_CURRENT + "/" +
NameNodeFile.IMAGE.getName());
File fsImg2 = new File(path2, Storage.STORAGE_DIR_CURRENT + "/" +
NameNodeFile.IMAGE.getName());
@@ -155,13 +216,29 @@
File fsEdits1 = new File(path1, Storage.STORAGE_DIR_CURRENT + "/" +
NameNodeFile.EDITS.getName());
File fsEdits2 = new File(path2, Storage.STORAGE_DIR_CURRENT + "/" +
NameNodeFile.EDITS.getName());
File fsEdits3 = new File(path3, Storage.STORAGE_DIR_CURRENT + "/" +
NameNodeFile.EDITS.getName());
-
+
+ long chkPt1 = readCheckpointTime(new File(path1,
Storage.STORAGE_DIR_CURRENT));
+ long chkPt2 = readCheckpointTime(new File(path2,
Storage.STORAGE_DIR_CURRENT));
+ long chkPt3 = readCheckpointTime(new File(path3,
Storage.STORAGE_DIR_CURRENT));
+
+ String md5_1 = null,md5_2 = null,md5_3 = null;
+ try {
+ md5_1 = getFileMD5(fsEdits1);
+ md5_2 = getFileMD5(fsEdits2);
+ md5_3 = getFileMD5(fsEdits3);
+ } catch (Exception e) {
+ System.err.println("md 5 calculation failed:" + e.getLocalizedMessage());
+ }
this.printStorages(cluster.getNameNode().getFSImage());
LOG.info("++++ image files = "+fsImg1.getAbsolutePath() + "," +
fsImg2.getAbsolutePath() + ","+ fsImg3.getAbsolutePath());
LOG.info("++++ edits files = "+fsEdits1.getAbsolutePath() + "," +
fsEdits2.getAbsolutePath() + ","+ fsEdits3.getAbsolutePath());
LOG.info("checkFiles compares lengths: img1=" + fsImg1.length() +
",img2=" + fsImg2.length() + ",img3=" + fsImg3.length());
LOG.info("checkFiles compares lengths: edits1=" + fsEdits1.length() +
",edits2=" + fsEdits2.length() + ",edits3=" + fsEdits3.length());
+ LOG.info("checkFiles compares chkPts: name1=" + chkPt1 + ",name2=" +
chkPt2 + ",name3=" + chkPt3);
+ LOG.info("checkFiles compares md5s: " + fsEdits1.getAbsolutePath() +
+ "="+ md5_1 + "," + fsEdits2.getAbsolutePath() + "=" + md5_2 + "," +
+ fsEdits3.getAbsolutePath() + "=" + md5_3);
if(valid) {
// should be the same
@@ -169,12 +246,26 @@
assertTrue(0 == fsImg3.length()); //shouldn't be created
assertTrue(fsEdits1.length() == fsEdits2.length());
assertTrue(fsEdits1.length() == fsEdits3.length());
+ assertTrue(md5_1.equals(md5_2));
+ assertTrue(md5_1.equals(md5_3));
+
+ // checkpoint times
+ assertTrue(chkPt1 == chkPt2);
+ assertTrue(chkPt1 == chkPt3);
} else {
// should be different
//assertTrue(fsImg1.length() != fsImg2.length());
//assertTrue(fsImg1.length() != fsImg3.length());
assertTrue(fsEdits1.length() != fsEdits2.length());
assertTrue(fsEdits1.length() != fsEdits3.length());
+
+ assertTrue(!md5_1.equals(md5_2));
+ assertTrue(!md5_1.equals(md5_3));
+
+
+ // checkpoint times
+ assertTrue(chkPt1 > chkPt2);
+ assertTrue(chkPt1 > chkPt3);
}
}
@@ -222,6 +313,13 @@
checkFiles(true);
System.out.println("****testStorageRestore: second Checkpoint done and
checkFiles(true) run");
+
+ // verify that all the logs are active
+ path = new Path("/", "test2");
+ writeFile(fs, path, 2);
+ System.out.println("****testStorageRestore: wrote a file and
checkFiles(true) run");
+ checkFiles(true);
+
secondary.shutdown();
cluster.shutdown();
}