Author: todd
Date: Fri May 18 05:28:18 2012
New Revision: 1339978
URL: http://svn.apache.org/viewvc?rev=1339978&view=rev
Log:
HDFS-3440. More effectively limit stream memory consumption when reading
corrupt edit logs. Contributed by Colin Patrick McCabe.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1339978&r1=1339977&r2=1339978&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri May 18
05:28:18 2012
@@ -187,6 +187,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3419. Cleanup LocatedBlock. (eli)
+ HDFS-3440. More effectively limit stream memory consumption when reading
+ corrupt edit logs (Colin Patrick McCabe via todd)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java?rev=1339978&r1=1339977&r2=1339978&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
Fri May 18 05:28:18 2012
@@ -75,7 +75,7 @@ class BookKeeperEditLogInputStream exten
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
DataInputStream in = new DataInputStream(tracker);
- reader = new FSEditLogOp.Reader(in, logVersion);
+ reader = new FSEditLogOp.Reader(in, tracker, logVersion);
}
@Override
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1339978&r1=1339977&r2=1339978&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
Fri May 18 05:28:18 2012
@@ -119,7 +119,7 @@ class EditLogBackupInputStream extends E
this.version = version;
- reader = new FSEditLogOp.Reader(in, version);
+ reader = new FSEditLogOp.Reader(in, tracker, version);
}
void clear() throws IOException {
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1339978&r1=1339977&r2=1339978&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
Fri May 18 05:28:18 2012
@@ -83,7 +83,7 @@ public class EditLogFileInputStream exte
throw new LogHeaderCorruptException("No header found in log");
}
- reader = new FSEditLogOp.Reader(in, logVersion);
+ reader = new FSEditLogOp.Reader(in, tracker, logVersion);
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.isInProgress = isInProgress;
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1339978&r1=1339977&r2=1339978&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
Fri May 18 05:28:18 2012
@@ -721,17 +721,31 @@ public class FSEditLogLoader {
/**
* Stream wrapper that keeps track of the current stream position.
+ *
+ * This stream also allows us to set a limit on how many bytes we can read
+ * without getting an exception.
*/
- public static class PositionTrackingInputStream extends FilterInputStream {
+ public static class PositionTrackingInputStream extends FilterInputStream
+ implements StreamLimiter {
private long curPos = 0;
private long markPos = -1;
+ private long limitPos = Long.MAX_VALUE;
public PositionTrackingInputStream(InputStream is) {
super(is);
}
+ private void checkLimit(long amt) throws IOException {
+ long extra = (curPos + amt) - limitPos;
+ if (extra > 0) {
+ throw new IOException("Tried to read " + amt + " byte(s) past " +
+ "the limit at offset " + limitPos);
+ }
+ }
+
@Override
public int read() throws IOException {
+ checkLimit(1);
int ret = super.read();
if (ret != -1) curPos++;
return ret;
@@ -739,6 +753,7 @@ public class FSEditLogLoader {
@Override
public int read(byte[] data) throws IOException {
+ checkLimit(data.length);
int ret = super.read(data);
if (ret > 0) curPos += ret;
return ret;
@@ -746,12 +761,18 @@ public class FSEditLogLoader {
@Override
public int read(byte[] data, int offset, int length) throws IOException {
+ checkLimit(length);
int ret = super.read(data, offset, length);
if (ret > 0) curPos += ret;
return ret;
}
@Override
+ public void setLimit(long limit) {
+ limitPos = curPos + limit;
+ }
+
+ @Override
public void mark(int limit) {
super.mark(limit);
markPos = curPos;
@@ -773,6 +794,11 @@ public class FSEditLogLoader {
@Override
public long skip(long amt) throws IOException {
+ long extra = (curPos + amt) - limitPos;
+ if (extra > 0) {
+ throw new IOException("Tried to skip " + extra + " bytes past " +
+ "the limit at offset " + limitPos);
+ }
long ret = super.skip(amt);
curPos += ret;
return ret;
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1339978&r1=1339977&r2=1339978&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
Fri May 18 05:28:18 2012
@@ -75,7 +75,10 @@ import java.io.EOFException;
public abstract class FSEditLogOp {
public final FSEditLogOpCodes opCode;
long txid;
- private static final int MAX_OP_SIZE = 100 * 1024 * 1024;
+ /**
+ * Opcode size is limited to 1.5 megabytes
+ */
+ public static final int MAX_OP_SIZE = (3 * 1024 * 1024) / 2;
@SuppressWarnings("deprecation")
@@ -2229,6 +2232,7 @@ public abstract class FSEditLogOp {
*/
public static class Reader {
private final DataInputStream in;
+ private final StreamLimiter limiter;
private final int logVersion;
private final Checksum checksum;
private final OpInstanceCache cache;
@@ -2239,7 +2243,7 @@ public abstract class FSEditLogOp {
* @param logVersion The version of the data coming from the stream.
*/
@SuppressWarnings("deprecation")
- public Reader(DataInputStream in, int logVersion) {
+ public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
this.logVersion = logVersion;
if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
this.checksum = new PureJavaCrc32();
@@ -2253,6 +2257,7 @@ public abstract class FSEditLogOp {
} else {
this.in = in;
}
+ this.limiter = limiter;
this.cache = new OpInstanceCache();
}
@@ -2272,6 +2277,7 @@ public abstract class FSEditLogOp {
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
while (true) {
try {
+ limiter.setLimit(MAX_OP_SIZE);
in.mark(MAX_OP_SIZE);
return decodeOp();
} catch (GarbageAfterTerminatorException e) {
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java?rev=1339978&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java
Fri May 18 05:28:18 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/**
+ * An object that allows you to set a limit on a stream. This limit
+ * represents the number of bytes that can be read without getting an
+ * exception.
+ */
+interface StreamLimiter {
+ /**
+ * Set a limit. Calling this function clears any existing limit.
+ */
+ public void setLimit(long limit);
+}
\ No newline at end of file
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1339978&r1=1339977&r2=1339978&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Fri May 18 05:28:18 2012
@@ -765,7 +765,7 @@ public class TestEditLog extends TestCas
tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
in = new DataInputStream(tracker);
- reader = new FSEditLogOp.Reader(in, version);
+ reader = new FSEditLogOp.Reader(in, tracker, version);
}
@Override
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java?rev=1339978&r1=1339977&r2=1339978&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
Fri May 18 05:28:18 2012
@@ -22,8 +22,10 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
@@ -316,4 +318,47 @@ public class TestFSEditLogLoader {
fis.close();
}
}
+
+ @Test
+ public void testStreamLimiter() throws IOException {
+ final File LIMITER_TEST_FILE = new File(TEST_DIR, "limiter.test");
+
+ FileOutputStream fos = new FileOutputStream(LIMITER_TEST_FILE);
+ try {
+ fos.write(0x12);
+ fos.write(0x12);
+ fos.write(0x12);
+ } finally {
+ fos.close();
+ }
+
+ FileInputStream fin = new FileInputStream(LIMITER_TEST_FILE);
+ BufferedInputStream bin = new BufferedInputStream(fin);
+ FSEditLogLoader.PositionTrackingInputStream tracker =
+ new FSEditLogLoader.PositionTrackingInputStream(bin);
+ try {
+ tracker.setLimit(2);
+ tracker.mark(100);
+ tracker.read();
+ tracker.read();
+ try {
+ tracker.read();
+ fail("expected to get IOException after reading past the limit");
+ } catch (IOException e) {
+ }
+ tracker.reset();
+ tracker.mark(100);
+ byte arr[] = new byte[3];
+ try {
+ tracker.read(arr);
+ fail("expected to get IOException after reading past the limit");
+ } catch (IOException e) {
+ }
+ tracker.reset();
+ arr = new byte[2];
+ tracker.read(arr);
+ } finally {
+ tracker.close();
+ }
+ }
}