Author: tomwhite
Date: Wed Dec 9 04:17:02 2009
New Revision: 888697
URL: http://svn.apache.org/viewvc?rev=888697&view=rev
Log:
HADOOP-6254. Slow reads cause s3n to fail with SocketTimeoutException.
Contributed by Andrew Hitchcock.
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=888697&r1=888696&r2=888697&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Wed Dec 9 04:17:02 2009
@@ -62,6 +62,9 @@
HADOOP-6386. NameNode's HttpServer can't instantiate InetSocketAddress:
IllegalArgumentException is thrown (cos)
+ HADOOP-6254. Slow reads cause s3n to fail with SocketTimeoutException.
+ (Andrew Hitchcock via tomwhite)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java?rev=888697&r1=888696&r2=888697&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
Wed Dec 9 04:17:02 2009
@@ -86,20 +86,31 @@
static final String PATH_DELIMITER = Path.SEPARATOR;
private static final int S3_MAX_LISTING_LENGTH = 1000;
- private class NativeS3FsInputStream extends FSInputStream {
+ static class NativeS3FsInputStream extends FSInputStream {
+ private NativeFileSystemStore store;
+ private Statistics statistics;
private InputStream in;
private final String key;
private long pos = 0;
- public NativeS3FsInputStream(InputStream in, String key) {
+ public NativeS3FsInputStream(NativeFileSystemStore store, Statistics
statistics, InputStream in, String key) {
+ this.store = store;
+ this.statistics = statistics;
this.in = in;
this.key = key;
}
@Override
public synchronized int read() throws IOException {
- int result = in.read();
+ int result = -1;
+ try {
+ result = in.read();
+ } catch (IOException e) {
+ LOG.info("Received IOException while reading '" + key + "', attempting
to reopen.");
+ seek(pos);
+ result = in.read();
+ }
if (result != -1) {
pos++;
}
@@ -112,7 +123,14 @@
public synchronized int read(byte[] b, int off, int len)
throws IOException {
- int result = in.read(b, off, len);
+ int result = -1;
+ try {
+ result = in.read(b, off, len);
+ } catch (IOException e) {
+ LOG.info("Received IOException while reading '" + key + "', attempting
to reopen.");
+ seek(pos);
+ result = in.read(b, off, len);
+ }
if (result > 0) {
pos += result;
}
@@ -514,7 +532,7 @@
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
return new FSDataInputStream(new BufferedFSInputStream(
- new NativeS3FsInputStream(store.retrieve(key), key), bufferSize));
+ new NativeS3FsInputStream(store, statistics, store.retrieve(key),
key), bufferSize));
}
// rename() and delete() use this method to ensure that the parent directory
Modified:
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java?rev=888697&r1=888696&r2=888697&view=diff
==============================================================================
---
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
(original)
+++
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
Wed Dec 9 04:17:02 2009
@@ -19,12 +19,14 @@
package org.apache.hadoop.fs.s3native;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3native.NativeS3FileSystem.NativeS3FsInputStream;
public abstract class NativeS3FileSystemContractBaseTest
extends FileSystemContractBaseTest {
@@ -148,5 +150,79 @@
assertEquals("Double default block size", newBlockSize,
fs.getFileStatus(file).getBlockSize());
}
+
+ public void testRetryOnIoException() throws Exception {
+ class TestInputStream extends InputStream {
+ boolean shouldThrow = false;
+ int throwCount = 0;
+ int pos = 0;
+ byte[] bytes;
+
+ public TestInputStream() {
+ bytes = new byte[256];
+ for (int i = 0; i < 256; i++) {
+ bytes[i] = (byte)i;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ shouldThrow = !shouldThrow;
+ if (shouldThrow) {
+ throwCount++;
+ throw new IOException();
+ }
+ return pos++;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ shouldThrow = !shouldThrow;
+ if (shouldThrow) {
+ throwCount++;
+ throw new IOException();
+ }
+
+ int sizeToRead = Math.min(len, 256 - pos);
+ for (int i = 0; i < sizeToRead; i++) {
+ b[i] = bytes[pos + i];
+ }
+ pos += sizeToRead;
+ return sizeToRead;
+ }
+ }
+
+ final InputStream is = new TestInputStream();
+
+ class MockNativeFileSystemStore extends Jets3tNativeFileSystemStore {
+ @Override
+ public InputStream retrieve(String key, long byteRangeStart) throws
IOException {
+ return is;
+ }
+ }
+
+ NativeS3FsInputStream stream = new NativeS3FsInputStream(new
MockNativeFileSystemStore(), null, is, "");
+
+ // Test reading methods.
+ byte[] result = new byte[256];
+ for (int i = 0; i < 128; i++) {
+ result[i] = (byte)stream.read();
+ }
+ for (int i = 128; i < 256; i += 8) {
+ byte[] temp = new byte[8];
+ int read = stream.read(temp, 0, 8);
+ assertEquals(8, read);
+ System.arraycopy(temp, 0, result, i, 8);
+ }
+
+ // Assert correct
+ for (int i = 0; i < 256; i++) {
+ assertEquals((byte)i, result[i]);
+ }
+
+ // Test to make sure the throw path was exercised.
+ // 144 = 128 + (128 / 8)
+ assertEquals(144, ((TestInputStream)is).throwCount);
+ }
}