Author: tomwhite
Date: Wed Feb 21 14:02:13 2007
New Revision: 510258
URL: http://svn.apache.org/viewvc?view=rev&rev=510258
Log:
HADOOP-997. Implement S3 retry mechanism for failed block transfers.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicy.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryProxy.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/package.html
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/UnreliableImplementation.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/UnreliableInterface.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Feb 21 14:02:13 2007
@@ -95,6 +95,9 @@
28. HADOOP-1025. Remove some obsolete code in ipc.Server. (cutting)
+29. HADOOP-997. Implement S3 retry mechanism for failed block
+ transfers. This includes a generic retry mechanism for use
+ elsewhere in Hadoop. (tomwhite)
Release 0.11.2 - 2007-02-16
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Feb 21 14:02:13 2007
@@ -375,17 +375,31 @@
<property>
<name>fs.s3.block.size</name>
- <value>1048576</value>
- <description>
- Block size to use writing S3. Note, the default block size for jets3t,
- the library at the base of the S3 filesystem, is less. Its 131072.
- If S3 is having a bad day requiring retries, if the retry is at some
- point after the end of jets3t RepeatableInputStream buffer and end of
- the S3 block, the transfer will fail. To change the jets3t block size,
- add a jets3t.properties file to $HADOOP_HOME/conf so it can be found
- on the CLASSPATH and amend the 's3service.stream-retry-buffer-size'
- property. See http://jets3t.s3.amazonaws.com/toolkit/configuration.html
- for more on jets3t configurables.
+ <value>67108864</value>
+ <description>Block size to use when writing files to S3.</description>
+</property>
+
+<property>
+ <name>fs.s3.buffer.dir</name>
+ <value>${hadoop.tmp.dir}/s3</value>
+ <description>Determines where on the local filesystem the S3 filesystem
+ should store its blocks before it sends them to S3
+ or after it retrieves them from S3.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3.maxRetries</name>
+ <value>4</value>
+ <description>The maximum number of retries for reading or writing blocks to
S3,
+ before we signal failure to the application.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3.sleepTimeSeconds</name>
+ <value>10</value>
+ <description>The number of seconds to sleep between each S3 retry.
</description>
</property>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java Wed Feb 21
14:02:13 2007
@@ -1,6 +1,9 @@
package org.apache.hadoop.fs.s3;
-class Block {
+/**
+ * Holds metadata about a block of data being stored in a [EMAIL PROTECTED]
FileSystemStore}.
+ */
+public class Block {
private long id;
private long length;
Modified:
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java
Wed Feb 21 14:02:13 2007
@@ -1,25 +1,28 @@
package org.apache.hadoop.fs.s3;
+import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-interface FileSystemStore {
+/**
+ * A facility for storing and retrieving [EMAIL PROTECTED] INode}s and [EMAIL
PROTECTED] Block}s.
+ */
+public interface FileSystemStore {
void initialize(URI uri, Configuration conf) throws IOException;
void storeINode(Path path, INode inode) throws IOException;
- void storeBlock(Block block, InputStream in) throws IOException;
+ void storeBlock(Block block, File file) throws IOException;
boolean inodeExists(Path path) throws IOException;
boolean blockExists(long blockId) throws IOException;
- INode getINode(Path path) throws IOException;
- InputStream getBlockStream(Block block, long byteRangeStart) throws
IOException;
+ INode retrieveINode(Path path) throws IOException;
+ File retrieveBlock(Block block, long byteRangeStart) throws IOException;
void deleteINode(Path path) throws IOException;
void deleteBlock(Block block) throws IOException;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java Wed Feb 21
14:02:13 2007
@@ -11,7 +11,7 @@
* Holds file metadata including type (regular file, or directory),
* and the list of blocks that are pointers to the data.
*/
-class INode {
+public class INode {
enum FileType {
DIRECTORY, FILE
Modified:
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
---
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
(original)
+++
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
Wed Feb 21 14:02:13 2007
@@ -1,7 +1,14 @@
package org.apache.hadoop.fs.s3;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
@@ -24,11 +31,18 @@
private static final String PATH_DELIMITER = urlEncode(Path.SEPARATOR);
private static final String BLOCK_PREFIX = "block_";
+ private Configuration conf;
+
private S3Service s3Service;
private S3Bucket bucket;
-
+
+ private int bufferSize;
+
public void initialize(URI uri, Configuration conf) throws IOException {
+
+ this.conf = conf;
+
try {
String accessKey = null;
String secretAccessKey = null;
@@ -78,6 +92,8 @@
bucket = new S3Bucket(uri.getHost());
createBucket(bucket.getName());
+
+ this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
}
private void createBucket(String bucketName) throws IOException {
@@ -159,13 +175,47 @@
}
}
- public INode getINode(Path path) throws IOException {
+ public INode retrieveINode(Path path) throws IOException {
return INode.deserialize(get(pathToKey(path)));
}
- public InputStream getBlockStream(Block block, long byteRangeStart)
+ public File retrieveBlock(Block block, long byteRangeStart)
throws IOException {
- return get(blockToKey(block), byteRangeStart);
+ File fileBlock = null;
+ InputStream in = null;
+ OutputStream out = null;
+ try {
+ fileBlock = newBackupFile();
+ in = get(blockToKey(block), byteRangeStart);
+ out = new BufferedOutputStream(new FileOutputStream(fileBlock));
+ byte[] buf = new byte[bufferSize];
+ int numRead;
+ while ((numRead = in.read(buf)) >= 0) {
+ out.write(buf, 0, numRead);
+ }
+ return fileBlock;
+ } catch (IOException e) {
+ // close output stream to file then delete file
+ closeQuietly(out);
+ out = null; // to prevent a second close
+ if (fileBlock != null) {
+ fileBlock.delete();
+ }
+ throw e;
+ } finally {
+ closeQuietly(out);
+ closeQuietly(in);
+ }
+ }
+
+ private File newBackupFile() throws IOException {
+ File dir = new File(conf.get("fs.s3.buffer.dir"));
+ if (!dir.exists() && !dir.mkdirs()) {
+ throw new IOException("Cannot create S3 buffer directory: " + dir);
+ }
+ File result = File.createTempFile("input-", ".tmp", dir);
+ result.deleteOnExit();
+ return result;
}
public Set<Path> listSubPaths(Path path) throws IOException {
@@ -229,8 +279,24 @@
put(pathToKey(path), inode.serialize(), inode.getSerializedLength());
}
- public void storeBlock(Block block, InputStream in) throws IOException {
- put(blockToKey(block), in, block.getLength());
+ public void storeBlock(Block block, File file) throws IOException {
+ BufferedInputStream in = null;
+ try {
+ in = new BufferedInputStream(new FileInputStream(file));
+ put(blockToKey(block), in, block.getLength());
+ } finally {
+ closeQuietly(in);
+ }
+ }
+
+ private void closeQuietly(Closeable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
}
private String pathToKey(Path path) {
@@ -296,7 +362,7 @@
for (int i = 0; i < objects.length; i++) {
Path path = keyToPath(objects[i].getKey());
sb.append(path).append("\n");
- INode m = getINode(path);
+ INode m = retrieveINode(path);
sb.append("\t").append(m.getFileType()).append("\n");
if (m.getFileType() == FileType.DIRECTORY) {
continue;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Wed
Feb 21 14:02:13 2007
@@ -2,7 +2,10 @@
import java.io.IOException;
import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
@@ -10,6 +13,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.Progressable;
/**
@@ -20,7 +26,7 @@
*/
public class S3FileSystem extends FileSystem {
- private static final long DEFAULT_BLOCK_SIZE = 1 * 1024 * 1024;
+ private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
private URI uri;
@@ -31,9 +37,9 @@
private Path workingDir = new Path("/user", System.getProperty("user.name"));
public S3FileSystem() {
- this(new Jets3tFileSystemStore());
+ // set store in initialize()
}
-
+
public S3FileSystem(FileSystemStore store) {
this.store = store;
}
@@ -45,12 +51,36 @@
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
+ if (store == null) {
+ store = createDefaultStore(conf);
+ }
store.initialize(uri, conf);
setConf(conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.localFs = get(URI.create("file:///"), conf);
}
+ private static FileSystemStore createDefaultStore(Configuration conf) {
+ FileSystemStore store = new Jets3tFileSystemStore();
+
+ RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ conf.getInt("fs.s3.maxRetries", 4),
+ conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
+ Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
+ exceptionToPolicyMap.put(IOException.class, basePolicy);
+ exceptionToPolicyMap.put(S3Exception.class, basePolicy);
+
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+ Map<String,RetryPolicy> methodNameToPolicyMap = new
HashMap<String,RetryPolicy>();
+ methodNameToPolicyMap.put("storeBlock", methodPolicy);
+ methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
+
+ return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
+ store, methodNameToPolicyMap);
+ }
+
@Override
public String getName() {
return getUri().toString();
@@ -81,7 +111,7 @@
@Override
public boolean mkdirs(Path path) throws IOException {
Path absolutePath = makeAbsolute(path);
- INode inode = store.getINode(absolutePath);
+ INode inode = store.retrieveINode(absolutePath);
if (inode == null) {
store.storeINode(absolutePath, INode.DIRECTORY_INODE);
} else if (inode.isFile()) {
@@ -94,7 +124,7 @@
@Override
public boolean isDirectory(Path path) throws IOException {
- INode inode = store.getINode(makeAbsolute(path));
+ INode inode = store.retrieveINode(makeAbsolute(path));
if (inode == null) {
return false;
}
@@ -103,7 +133,7 @@
@Override
public boolean isFile(Path path) throws IOException {
- INode inode = store.getINode(makeAbsolute(path));
+ INode inode = store.retrieveINode(makeAbsolute(path));
if (inode == null) {
return false;
}
@@ -111,7 +141,7 @@
}
private INode checkFile(Path path) throws IOException {
- INode inode = store.getINode(makeAbsolute(path));
+ INode inode = store.retrieveINode(makeAbsolute(path));
if (inode == null) {
throw new IOException("No such file.");
}
@@ -124,7 +154,7 @@
@Override
public Path[] listPathsRaw(Path path) throws IOException {
Path absolutePath = makeAbsolute(path);
- INode inode = store.getINode(absolutePath);
+ INode inode = store.retrieveINode(absolutePath);
if (inode == null) {
return null;
} else if (inode.isFile()) {
@@ -147,7 +177,7 @@
short replication, long blockSize, Progressable progress)
throws IOException {
- INode inode = store.getINode(makeAbsolute(file));
+ INode inode = store.retrieveINode(makeAbsolute(file));
if (inode != null) {
if (overwrite) {
deleteRaw(file);
@@ -175,16 +205,16 @@
@Override
public boolean renameRaw(Path src, Path dst) throws IOException {
Path absoluteSrc = makeAbsolute(src);
- INode srcINode = store.getINode(absoluteSrc);
+ INode srcINode = store.retrieveINode(absoluteSrc);
if (srcINode == null) {
// src path doesn't exist
return false;
}
Path absoluteDst = makeAbsolute(dst);
- INode dstINode = store.getINode(absoluteDst);
+ INode dstINode = store.retrieveINode(absoluteDst);
if (dstINode != null && dstINode.isDirectory()) {
absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
- dstINode = store.getINode(absoluteDst);
+ dstINode = store.retrieveINode(absoluteDst);
}
if (dstINode != null) {
// dst path already exists - can't overwrite
@@ -192,7 +222,7 @@
}
Path dstParent = absoluteDst.getParent();
if (dstParent != null) {
- INode dstParentINode = store.getINode(dstParent);
+ INode dstParentINode = store.retrieveINode(dstParent);
if (dstParentINode == null || dstParentINode.isFile()) {
// dst parent doesn't exist or is a file
return false;
@@ -202,12 +232,12 @@
}
private boolean renameRawRecursive(Path src, Path dst) throws IOException {
- INode srcINode = store.getINode(src);
+ INode srcINode = store.retrieveINode(src);
store.storeINode(dst, srcINode);
store.deleteINode(src);
if (srcINode.isDirectory()) {
for (Path oldSrc : store.listDeepSubPaths(src)) {
- INode inode = store.getINode(oldSrc);
+ INode inode = store.retrieveINode(oldSrc);
if (inode == null) {
return false;
}
@@ -222,7 +252,7 @@
@Override
public boolean deleteRaw(Path path) throws IOException {
Path absolutePath = makeAbsolute(path);
- INode inode = store.getINode(absolutePath);
+ INode inode = store.retrieveINode(absolutePath);
if (inode == null) {
return false;
}
@@ -282,9 +312,9 @@
@Override
public long getBlockSize(Path path) throws IOException {
- INode inode = store.getINode(makeAbsolute(path));
+ INode inode = store.retrieveINode(makeAbsolute(path));
if (inode == null) {
- throw new IOException("No such file or directory.");
+ throw new IOException(path.toString() + ": No such file or directory.");
}
Block[] blocks = inode.getBlocks();
if (blocks == null || blocks.length == 0) {
Modified:
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java Wed
Feb 21 14:02:13 2007
@@ -1,21 +1,15 @@
package org.apache.hadoop.fs.s3;
-import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
class S3InputStream extends FSInputStream {
- private int bufferSize;
-
private FileSystemStore store;
private Block[] blocks;
@@ -26,6 +20,8 @@
private long pos = 0;
+ private File blockFile;
+
private DataInputStream blockStream;
private long blockEnd = -1;
@@ -38,7 +34,6 @@
for (Block block : blocks) {
this.fileLength += block.getLength();
}
- this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
}
@Override
@@ -128,21 +123,11 @@
// read block blocks[targetBlock] from position offsetIntoBlock
- File fileBlock = File.createTempFile("s3fs-in", "");
- fileBlock.deleteOnExit();
- InputStream in = store.getBlockStream(blocks[targetBlock],
offsetIntoBlock);
- OutputStream out = new BufferedOutputStream(new
FileOutputStream(fileBlock));
- byte[] buf = new byte[bufferSize];
- int numRead;
- while ((numRead = in.read(buf)) >= 0) {
- out.write(buf, 0, numRead);
- }
- out.close();
- in.close();
+ this.blockFile = store.retrieveBlock(blocks[targetBlock], offsetIntoBlock);
this.pos = target;
this.blockEnd = targetBlockEnd;
- this.blockStream = new DataInputStream(new FileInputStream(fileBlock));
+ this.blockStream = new DataInputStream(new FileInputStream(blockFile));
}
@@ -153,8 +138,10 @@
}
if (blockStream != null) {
blockStream.close();
- blockStream.close();
blockStream = null;
+ }
+ if (blockFile != null) {
+ blockFile.delete();
}
super.close();
closed = true;
Modified:
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java
Wed Feb 21 14:02:13 2007
@@ -1,10 +1,8 @@
package org.apache.hadoop.fs.s3;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
@@ -18,6 +16,8 @@
class S3OutputStream extends FSOutputStream {
+ private Configuration conf;
+
private int bufferSize;
private FileSystemStore store;
@@ -49,6 +49,7 @@
public S3OutputStream(Configuration conf, FileSystemStore store,
Path path, long blockSize, Progressable progress) throws IOException {
+ this.conf = conf;
this.store = store;
this.path = path;
this.blockSize = blockSize;
@@ -60,7 +61,11 @@
}
private File newBackupFile() throws IOException {
- File result = File.createTempFile("s3fs-out", "");
+ File dir = new File(conf.get("fs.s3.buffer.dir"));
+ if (!dir.exists() && !dir.mkdirs()) {
+ throw new IOException("Cannot create S3 buffer directory: " + dir);
+ }
+ File result = File.createTempFile("output-", ".tmp", dir);
result.deleteOnExit();
return result;
}
@@ -147,9 +152,7 @@
//
// TODO: Use passed in Progressable to report progress.
nextBlockOutputStream();
- InputStream in = new FileInputStream(backupFile);
- store.storeBlock(nextBlock, in);
- in.close();
+ store.storeBlock(nextBlock, backupFile);
internalClose();
//
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?view=auto&rev=510258
==============================================================================
---
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
(added)
+++
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
Wed Feb 21 14:02:13 2007
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+class RetryInvocationHandler implements InvocationHandler {
+ public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.io.retry.RetryInvocationHandler");
+ private Object implementation;
+
+ private RetryPolicy defaultPolicy;
+ private Map<String,RetryPolicy> methodNameToPolicyMap;
+
+ public RetryInvocationHandler(Object implementation, RetryPolicy
retryPolicy) {
+ this.implementation = implementation;
+ this.defaultPolicy = retryPolicy;
+ this.methodNameToPolicyMap = Collections.emptyMap();
+ }
+
+ public RetryInvocationHandler(Object implementation, Map<String,
RetryPolicy> methodNameToPolicyMap) {
+ this.implementation = implementation;
+ this.defaultPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
+ this.methodNameToPolicyMap = methodNameToPolicyMap;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
+ if (policy == null) {
+ policy = defaultPolicy;
+ }
+
+ int retries = 0;
+ while (true) {
+ try {
+ return invokeMethod(method, args);
+ } catch (Exception e) {
+ if (!policy.shouldRetry(e, retries++)) {
+ LOG.warn("Exception while invoking " + method.getName()
+ + " of " + implementation.getClass() + ". Not retrying."
+ + StringUtils.stringifyException(e));
+ if (!method.getReturnType().equals(Void.TYPE)) {
+ throw e; // non-void methods can't fail without an exception
+ }
+ return null;
+ }
+ LOG.warn("Exception while invoking " + method.getName()
+ + " of " + implementation.getClass() + ". Retrying."
+ + StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ private Object invokeMethod(Method method, Object[] args) throws Throwable {
+ try {
+ return method.invoke(implementation, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ }
+
+}
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java?view=auto&rev=510258
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
(added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
Wed Feb 21 14:02:13 2007
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * A collection of useful implementations of [EMAIL PROTECTED] RetryPolicy}.
+ * </p>
+ * @author Tom White
+ */
+public class RetryPolicies {
+
+ /**
+ * <p>
+ * Try once, and fail by re-throwing the exception.
+ * This corresponds to having no retry mechanism in place.
+ * </p>
+ */
+ public static final RetryPolicy TRY_ONCE_THEN_FAIL = new TryOnceThenFail();
+
+ /**
+ * <p>
+ * Try once, and fail silently for <code>void</code> methods, or by
+ * re-throwing the exception for non-<code>void</code> methods.
+ * </p>
+ */
+ public static final RetryPolicy TRY_ONCE_DONT_FAIL = new TryOnceDontFail();
+
+ /**
+ * <p>
+ * Keep trying forever.
+ * </p>
+ */
+ public static final RetryPolicy RETRY_FOREVER = new RetryForever();
+
+ /**
+ * <p>
+ * Keep trying a limited number of times, waiting a fixed time between
attempts,
+ * and then fail by re-throwing the exception.
+ * </p>
+ */
+ public static final RetryPolicy retryUpToMaximumCountWithFixedSleep(int
maxRetries, long sleepTime, TimeUnit timeUnit) {
+ return new RetryUpToMaximumCountWithFixedSleep(maxRetries, sleepTime,
timeUnit);
+ }
+
+ /**
+ * <p>
+ * Keep trying for a maximum time, waiting a fixed time between attempts,
+ * and then fail by re-throwing the exception.
+ * </p>
+ */
+ public static final RetryPolicy retryUpToMaximumTimeWithFixedSleep(long
maxTime, long sleepTime, TimeUnit timeUnit) {
+ return new RetryUpToMaximumTimeWithFixedSleep(maxTime, sleepTime,
timeUnit);
+ }
+
+ /**
+ * <p>
+ * Keep trying a limited number of times, waiting a growing amount of time
between attempts,
+ * and then fail by re-throwing the exception.
+ * The time between attempts is <code>sleepTime</code> mutliplied by the
number of tries so far.
+ * </p>
+ */
+ public static final RetryPolicy
retryUpToMaximumCountWithProportionalSleep(int maxRetries, long sleepTime,
TimeUnit timeUnit) {
+ return new RetryUpToMaximumCountWithProportionalSleep(maxRetries,
sleepTime, timeUnit);
+ }
+
+ /**
+ * <p>
+ * Set a default policy with some explicit handlers for specific exceptions.
+ * </p>
+ */
+ public static final RetryPolicy retryByException(RetryPolicy defaultPolicy,
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap) {
+ return new ExceptionDependentRetry(defaultPolicy, exceptionToPolicyMap);
+ }
+
+ static class TryOnceThenFail implements RetryPolicy {
+ public boolean shouldRetry(Exception e, int retries) throws Exception {
+ throw e;
+ }
+ }
+ static class TryOnceDontFail implements RetryPolicy {
+ public boolean shouldRetry(Exception e, int retries) throws Exception {
+ return false;
+ }
+ }
+
+ static class RetryForever implements RetryPolicy {
+ public boolean shouldRetry(Exception e, int retries) throws Exception {
+ return true;
+ }
+ }
+
+ static abstract class RetryLimited implements RetryPolicy {
+ int maxRetries;
+ long sleepTime;
+ TimeUnit timeUnit;
+
+ public RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) {
+ this.maxRetries = maxRetries;
+ this.sleepTime = sleepTime;
+ this.timeUnit = timeUnit;
+ }
+
+ public boolean shouldRetry(Exception e, int retries) throws Exception {
+ if (retries > maxRetries) {
+ throw e;
+ }
+ try {
+ timeUnit.sleep(calculateSleepTime(retries));
+ } catch (InterruptedException ie) {
+ // retry
+ }
+ return true;
+ }
+
+ protected abstract long calculateSleepTime(int retries);
+ }
+
+ static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited {
+ public RetryUpToMaximumCountWithFixedSleep(int maxRetries, long sleepTime,
TimeUnit timeUnit) {
+ super(maxRetries, sleepTime, timeUnit);
+ }
+
+ @Override
+ protected long calculateSleepTime(int retries) {
+ return sleepTime;
+ }
+ }
+
+ static class RetryUpToMaximumTimeWithFixedSleep extends
RetryUpToMaximumCountWithFixedSleep {
+ public RetryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime,
TimeUnit timeUnit) {
+ super((int) (maxTime / sleepTime), sleepTime, timeUnit);
+ }
+ }
+
+ static class RetryUpToMaximumCountWithProportionalSleep extends RetryLimited
{
+ public RetryUpToMaximumCountWithProportionalSleep(int maxRetries, long
sleepTime, TimeUnit timeUnit) {
+ super(maxRetries, sleepTime, timeUnit);
+ }
+
+ @Override
+ protected long calculateSleepTime(int retries) {
+ return sleepTime * (retries + 1);
+ }
+ }
+
+ static class ExceptionDependentRetry implements RetryPolicy {
+
+ RetryPolicy defaultPolicy;
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap;
+
+ public ExceptionDependentRetry(RetryPolicy defaultPolicy,
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap) {
+ this.defaultPolicy = defaultPolicy;
+ this.exceptionToPolicyMap = exceptionToPolicyMap;
+ }
+
+ public boolean shouldRetry(Exception e, int retries) throws Exception {
+ RetryPolicy policy = exceptionToPolicyMap.get(e.getClass());
+ if (policy == null) {
+ policy = defaultPolicy;
+ }
+ return policy.shouldRetry(e, retries);
+ }
+
+ }
+
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicy.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicy.java?view=auto&rev=510258
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicy.java
(added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicy.java
Wed Feb 21 14:02:13 2007
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+/**
+ * <p>
+ * Specifies a policy for retrying method failures.
+ * Implementations of this interface should be immutable.
+ * </p>
+ * @author Tom White
+ */
+public interface RetryPolicy {
+ /**
+ * <p>
+ * Determines whether the framework should retry a
+ * method for the given exception, and the number
+ * of retries that have been made for that operation
+ * so far.
+ * </p>
+ * @param e The exception that caused the method to fail.
+ * @param retries The number of times the method has been retried.
+ * @return <code>true</code> if the method should be retried,
+ * <code>false</code> if the method should not be retried
+ * but shouldn't fail with an exception (only for void methods).
+ * @throws Exception The re-thrown exception <code>e</code> indicating
+ * that the method failed and should not be retried further.
+ */
+ public boolean shouldRetry(Exception e, int retries) throws Exception;
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryProxy.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryProxy.java?view=auto&rev=510258
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryProxy.java
(added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryProxy.java Wed
Feb 21 14:02:13 2007
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import java.lang.reflect.Proxy;
+import java.util.Map;
+
+/**
+ * <p>
+ * A factory for creating retry proxies.
+ * </p>
+ * @author Tom White
+ */
+public class RetryProxy {
+ /**
+ * <p>
+ * Create a proxy for an interface of an implementation class
+ * using the same retry policy for each method in the interface.
+ * </p>
+ * @param iface the interface that the retry will implement
+ * @param implementation the instance whose methods should be retried
+ * @param retryPolicy the policy for retirying method call failures
+ * @return the retry proxy
+ */
+ public static Object create(Class<?> iface, Object implementation,
+ RetryPolicy retryPolicy) {
+ return Proxy.newProxyInstance(
+ implementation.getClass().getClassLoader(),
+ new Class<?>[] { iface },
+ new RetryInvocationHandler(implementation, retryPolicy)
+ );
+ }
+
+ /**
+ * <p>
+ * Create a proxy for an interface of an implementation class
+ * using the a set of retry policies specified by method name.
+ * If no retry policy is defined for a method then a default of
+ * [EMAIL PROTECTED] RetryPolicies#TRY_ONCE_THEN_FAIL} is used.
+ * </p>
+ * @param iface the interface that the retry will implement
+ * @param implementation the instance whose methods should be retried
+ * @param methodNameToPolicyMap a map of method names to retry policies
+ * @return the retry proxy
+ */
+ public static Object create(Class<?> iface, Object implementation,
+ Map<String,RetryPolicy> methodNameToPolicyMap) {
+ return Proxy.newProxyInstance(
+ implementation.getClass().getClassLoader(),
+ new Class<?>[] { iface },
+ new RetryInvocationHandler(implementation, methodNameToPolicyMap)
+ );
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/package.html
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/package.html?view=auto&rev=510258
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/package.html (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/package.html Wed
Feb 21 14:02:13 2007
@@ -0,0 +1,30 @@
+<html>
+<body>
+
+<p>
+A mechanism for selectively retrying methods that throw exceptions under
certain circumstances.
+</p>
+
+<p>
+Typical usage is
+</p>
+
+<pre>
+UnreliableImplementation unreliableImpl = new UnreliableImplementation();
+UnreliableInterface unreliable = (UnreliableInterface)
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(4, 10,
TimeUnit.SECONDS));
+unreliable.call();
+</pre>
+
+<p>
+This will retry any method called on <code>unreliable</code> four times - in
this case the <code>call()</code>
+method - sleeping 10 seconds between
+each retry. There are a number of [EMAIL PROTECTED]
org.apache.hadoop.io.retry.RetryPolicies retry policies}
+available, or you can implement a custom one by implementing [EMAIL PROTECTED]
org.apache.hadoop.io.retry.RetryPolicy}.
+It is also possible to specify retry policies on a
[EMAIL PROTECTED] org.apache.hadoop.io.retry.RetryProxy#create(Class, Object,
Map) per-method basis}.
+</p>
+
+</body>
+</html>
Modified:
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
---
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java
(original)
+++
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java
Wed Feb 21 14:02:13 2007
@@ -1,9 +1,12 @@
package org.apache.hadoop.fs.s3;
-import java.io.ByteArrayInputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.LinkedHashSet;
@@ -23,11 +26,12 @@
*/
class InMemoryFileSystemStore implements FileSystemStore {
+ private Configuration conf;
private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();
private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>();
public void initialize(URI uri, Configuration conf) {
- // Nothing to initialize
+ this.conf = conf;
}
public void deleteINode(Path path) throws IOException {
@@ -46,13 +50,33 @@
return blocks.containsKey(blockId);
}
- public INode getINode(Path path) throws IOException {
+ public INode retrieveINode(Path path) throws IOException {
return inodes.get(path);
}
- public InputStream getBlockStream(Block block, long byteRangeStart) throws
IOException {
+ public File retrieveBlock(Block block, long byteRangeStart) throws
IOException {
byte[] data = blocks.get(block.getId());
- return new ByteArrayInputStream(data, (int) byteRangeStart, data.length -
(int) byteRangeStart);
+ File file = createTempFile();
+ BufferedOutputStream out = null;
+ try {
+ out = new BufferedOutputStream(new FileOutputStream(file));
+ out.write(data, (int) byteRangeStart, data.length - (int)
byteRangeStart);
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ return file;
+ }
+
+ private File createTempFile() throws IOException {
+ File dir = new File(conf.get("fs.s3.buffer.dir"));
+ if (!dir.exists() && !dir.mkdirs()) {
+ throw new IOException("Cannot create S3 buffer directory: " + dir);
+ }
+ File result = File.createTempFile("test-", ".tmp", dir);
+ result.deleteOnExit();
+ return result;
}
public Set<Path> listSubPaths(Path path) throws IOException {
@@ -85,12 +109,20 @@
inodes.put(path, inode);
}
- public void storeBlock(Block block, InputStream in) throws IOException {
+ public void storeBlock(Block block, File file) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buf = new byte[8192];
int numRead;
- while ((numRead = in.read(buf)) >= 0) {
- out.write(buf, 0, numRead);
+ BufferedInputStream in = null;
+ try {
+ in = new BufferedInputStream(new FileInputStream(file));
+ while ((numRead = in.read(buf)) >= 0) {
+ out.write(buf, 0, numRead);
+ }
+ } finally {
+ if (in != null) {
+ in.close();
+ }
}
blocks.put(block.getId(), out.toByteArray());
}
Modified:
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java?view=diff&rev=510258&r1=510257&r2=510258
==============================================================================
---
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java
(original)
+++
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java
Wed Feb 21 14:02:13 2007
@@ -6,7 +6,7 @@
@Override
public FileSystemStore getFileSystemStore() throws IOException {
- return new Jets3tFileSystemStore();
+ return null; // use default store
}
}
Added:
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java?view=auto&rev=510258
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java
(added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java
Wed Feb 21 14:02:13 2007
@@ -0,0 +1,120 @@
+package org.apache.hadoop.io.retry;
+
+import static org.apache.hadoop.io.retry.RetryPolicies.RETRY_FOREVER;
+import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_DONT_FAIL;
+import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL;
+import static org.apache.hadoop.io.retry.RetryPolicies.retryByException;
+import static
org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
+import static
org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
+import static
org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.retry.UnreliableInterface.FatalException;
+import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
+
+public class TestRetryProxy extends TestCase {
+
+ private UnreliableImplementation unreliableImpl;
+
+ @Override
+ protected void setUp() throws Exception {
+ unreliableImpl = new UnreliableImplementation();
+ }
+
+ public void testTryOnceThenFail() throws UnreliableException {
+ UnreliableInterface unreliable = (UnreliableInterface)
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl,
TRY_ONCE_THEN_FAIL);
+ unreliable.alwaysSucceeds();
+ try {
+ unreliable.failsOnceThenSucceeds();
+ fail("Should fail");
+ } catch (UnreliableException e) {
+ // expected
+ }
+ }
+
+ public void testTryOnceDontFail() throws UnreliableException {
+ UnreliableInterface unreliable = (UnreliableInterface)
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl,
TRY_ONCE_DONT_FAIL);
+ unreliable.alwaysSucceeds();
+ unreliable.failsOnceThenSucceeds();
+ try {
+ unreliable.failsOnceThenSucceedsWithReturnValue();
+ fail("Should fail");
+ } catch (UnreliableException e) {
+ // expected
+ }
+ }
+
+ public void testRetryForever() throws UnreliableException {
+ UnreliableInterface unreliable = (UnreliableInterface)
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl,
RETRY_FOREVER);
+ unreliable.alwaysSucceeds();
+ unreliable.failsOnceThenSucceeds();
+ unreliable.failsTenTimesThenSucceeds();
+ }
+
+ public void testRetryUpToMaximumCountWithFixedSleep() throws
UnreliableException {
+ UnreliableInterface unreliable = (UnreliableInterface)
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+ retryUpToMaximumCountWithFixedSleep(8, 1, TimeUnit.NANOSECONDS));
+ unreliable.alwaysSucceeds();
+ unreliable.failsOnceThenSucceeds();
+ try {
+ unreliable.failsTenTimesThenSucceeds();
+ fail("Should fail");
+ } catch (UnreliableException e) {
+ // expected
+ }
+ }
+
+ public void testRetryUpToMaximumTimeWithFixedSleep() throws
UnreliableException {
+ UnreliableInterface unreliable = (UnreliableInterface)
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+ retryUpToMaximumTimeWithFixedSleep(80, 10, TimeUnit.NANOSECONDS));
+ unreliable.alwaysSucceeds();
+ unreliable.failsOnceThenSucceeds();
+ try {
+ unreliable.failsTenTimesThenSucceeds();
+ fail("Should fail");
+ } catch (UnreliableException e) {
+ // expected
+ }
+ }
+
+ public void testRetryUpToMaximumCountWithProportionalSleep() throws
UnreliableException {
+ UnreliableInterface unreliable = (UnreliableInterface)
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+ retryUpToMaximumCountWithProportionalSleep(8, 1,
TimeUnit.NANOSECONDS));
+ unreliable.alwaysSucceeds();
+ unreliable.failsOnceThenSucceeds();
+ try {
+ unreliable.failsTenTimesThenSucceeds();
+ fail("Should fail");
+ } catch (UnreliableException e) {
+ // expected
+ }
+ }
+
+ public void testRetryByException() throws UnreliableException {
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
+ Collections.<Class<? extends Exception>,
RetryPolicy>singletonMap(FatalException.class, TRY_ONCE_THEN_FAIL);
+
+ UnreliableInterface unreliable = (UnreliableInterface)
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+ retryByException(RETRY_FOREVER, exceptionToPolicyMap));
+ unreliable.failsOnceThenSucceeds();
+ try {
+ unreliable.alwaysfailsWithFatalException();
+ fail("Should fail");
+ } catch (FatalException e) {
+ // expected
+ }
+ }
+
+}
Added:
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/UnreliableImplementation.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/UnreliableImplementation.java?view=auto&rev=510258
==============================================================================
---
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/UnreliableImplementation.java
(added)
+++
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/UnreliableImplementation.java
Wed Feb 21 14:02:13 2007
@@ -0,0 +1,36 @@
+package org.apache.hadoop.io.retry;
+
+public class UnreliableImplementation implements UnreliableInterface {
+
+ private int failsOnceInvocationCount,
+ failsOnceWithValueInvocationCount,
+ failsTenTimesInvocationCount;
+
+ public void alwaysSucceeds() {
+ // do nothing
+ }
+
+ public void alwaysfailsWithFatalException() throws FatalException {
+ throw new FatalException();
+ }
+
+ public void failsOnceThenSucceeds() throws UnreliableException {
+ if (failsOnceInvocationCount++ == 0) {
+ throw new UnreliableException();
+ }
+ }
+
+ public boolean failsOnceThenSucceedsWithReturnValue() throws
UnreliableException {
+ if (failsOnceWithValueInvocationCount++ == 0) {
+ throw new UnreliableException();
+ }
+ return true;
+ }
+
+ public void failsTenTimesThenSucceeds() throws UnreliableException {
+ if (failsTenTimesInvocationCount++ < 10) {
+ throw new UnreliableException();
+ }
+ }
+
+}
Added:
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/UnreliableInterface.java
URL:
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/UnreliableInterface.java?view=auto&rev=510258
==============================================================================
---
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/UnreliableInterface.java
(added)
+++
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/UnreliableInterface.java
Wed Feb 21 14:02:13 2007
@@ -0,0 +1,21 @@
+package org.apache.hadoop.io.retry;
+
+public interface UnreliableInterface {
+
+ public static class UnreliableException extends Exception {
+ // no body
+ }
+
+ public static class FatalException extends UnreliableException {
+ // no body
+ }
+
+ void alwaysSucceeds() throws UnreliableException;
+
+ void alwaysfailsWithFatalException() throws FatalException;
+
+ void failsOnceThenSucceeds() throws UnreliableException;
+ boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException;
+
+ void failsTenTimesThenSucceeds() throws UnreliableException;
+}