Author: cutting Date: Wed Dec 13 15:12:07 2006 New Revision: 486886 URL: http://svn.apache.org/viewvc?view=rev&rev=486886 Log: HADOOP-574. Add an Amazon S3 FileSystem implementation. Contributed by Tom White.
Added: lucene/hadoop/trunk/lib/commons-codec-1.3.jar (with props) lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar (with props) lucene/hadoop/trunk/lib/jets3t.jar (with props) lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/test/hadoop-site.xml Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=486886&r1=486885&r2=486886 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Dec 13 15:12:07 2006 @@ -92,6 +92,13 @@ 26. HADOOP-454. Add a 'dfs -dus' command that provides summary disk usage. (Hairong Kuang via cutting) +27. HADOOP-574. Add an Amazon S3 implementation of FileSystem. To + use this, one need only specify paths of the form + s3://id:[EMAIL PROTECTED]/. Alternately, the AWS access key id and + secret can be specified in your config, with the properties + fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey. + (Tom White via cutting) + Release 0.9.1 - 2006-12-06 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=486886&r1=486885&r2=486886 ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Dec 13 15:12:07 2006 @@ -119,6 +119,12 @@ </property> <property> + <name>fs.s3.impl</name> + <value>org.apache.hadoop.fs.s3.S3FileSystem</value> + <description>The FileSystem for s3: uris.</description> +</property> + +<property> <name>dfs.datanode.bindAddress</name> <value>0.0.0.0</value> <description> Added: lucene/hadoop/trunk/lib/commons-codec-1.3.jar URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/commons-codec-1.3.jar?view=auto&rev=486886 ============================================================================== Binary file - no diff available. Propchange: lucene/hadoop/trunk/lib/commons-codec-1.3.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar?view=auto&rev=486886 ============================================================================== Binary file - no diff available. Propchange: lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: lucene/hadoop/trunk/lib/jets3t.jar URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/jets3t.jar?view=auto&rev=486886 ============================================================================== Binary file - no diff available. Propchange: lucene/hadoop/trunk/lib/jets3t.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,26 @@ +package org.apache.hadoop.fs.s3; + +class Block { + private long id; + + private long length; + + public Block(long id, long length) { + this.id = id; + this.length = length; + } + + public long getId() { + return id; + } + + public long getLength() { + return length; + } + + @Override + public String toString() { + return "Block[" + id + ", " + length + "]"; + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,40 @@ +package org.apache.hadoop.fs.s3; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +interface FileSystemStore { + + void initialize(URI uri, Configuration conf) throws IOException; + + void storeINode(Path path, INode inode) throws IOException; + void storeBlock(Block block, InputStream in) throws IOException; + + boolean inodeExists(Path path) throws IOException; + boolean blockExists(long blockId) throws IOException; + + INode getINode(Path path) throws IOException; + InputStream getBlockStream(Block block, long byteRangeStart) throws IOException; + + void deleteINode(Path path) throws IOException; + void deleteBlock(Block block) throws IOException; + + Set<Path> listSubPaths(Path path) throws IOException; + + /** + * Delete everything. Used for testing. + * @throws IOException + */ + void purge() throws IOException; + + /** + * Diagnostic method to dump all INodes to the console. + * @throws IOException + */ + void dump() throws IOException; +} \ No newline at end of file Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,99 @@ +package org.apache.hadoop.fs.s3; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Holds file metadata including type (regular file, or directory), + * and the list of blocks that are pointers to the data. + */ +class INode { + + enum FileType { + DIRECTORY, FILE + } + + public static final FileType[] FILE_TYPES = { + FileType.DIRECTORY, + FileType.FILE + }; + + public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null); + + private FileType fileType; + private Block[] blocks; + + public INode(FileType fileType, Block[] blocks) { + this.fileType = fileType; + if (isDirectory() && blocks != null) { + throw new IllegalArgumentException("A directory cannot contain blocks."); + } + this.blocks = blocks; + } + + public Block[] getBlocks() { + return blocks; + } + + public FileType getFileType() { + return fileType; + } + + public boolean isDirectory() { + return fileType == FileType.DIRECTORY; + } + + public boolean isFile() { + return fileType == FileType.FILE; + } + + public long getSerializedLength() { + return 1L + (blocks == null ? 0 : 4 + blocks.length * 16); + } + + + public InputStream serialize() throws IOException { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(bytes); + out.writeByte(fileType.ordinal()); + if (isFile()) { + out.writeInt(blocks.length); + for (int i = 0; i < blocks.length; i++) { + out.writeLong(blocks[i].getId()); + out.writeLong(blocks[i].getLength()); + } + } + out.close(); + return new ByteArrayInputStream(bytes.toByteArray()); + } + + public static INode deserialize(InputStream in) throws IOException { + if (in == null) { + return null; + } + DataInputStream dataIn = new DataInputStream(in); + FileType fileType = INode.FILE_TYPES[dataIn.readByte()]; + switch (fileType) { + case DIRECTORY: + in.close(); + return INode.DIRECTORY_INODE; + case FILE: + int numBlocks = dataIn.readInt(); + Block[] blocks = new Block[numBlocks]; + for (int i = 0; i < numBlocks; i++) { + long id = dataIn.readLong(); + long length = dataIn.readLong(); + blocks[i] = new Block(id, length); + } + in.close(); + return new INode(fileType, blocks); + default: + throw new IllegalArgumentException("Cannot deserialize inode."); + } + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,296 @@ +package org.apache.hadoop.fs.s3; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3.INode.FileType; +import org.jets3t.service.S3Service; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Bucket; +import org.jets3t.service.model.S3Object; +import org.jets3t.service.security.AWSCredentials; + +class Jets3tFileSystemStore implements FileSystemStore { + + private static final String PATH_DELIMITER = urlEncode(Path.SEPARATOR); + private static final String BLOCK_PREFIX = "block_"; + + private S3Service s3Service; + + private S3Bucket bucket; + + public void initialize(URI uri, Configuration conf) throws IOException { + try { + String accessKey = null; + String secretAccessKey = null; + String userInfo = uri.getUserInfo(); + if (userInfo != null) { + int index = userInfo.indexOf(':'); + if (index != -1) { + accessKey = userInfo.substring(0, index); + secretAccessKey = userInfo.substring(index + 1); + } else { + accessKey = userInfo; + } + } + if (accessKey == null) { + accessKey = conf.get("fs.s3.awsAccessKeyId"); + } + if (secretAccessKey == null) { + secretAccessKey = conf.get("fs.s3.awsSecretAccessKey"); + } + if (accessKey == null && secretAccessKey == null) { + throw new IllegalArgumentException("AWS " + + "Access Key ID and Secret Access Key " + + "must be specified as the username " + + "or password (respectively) of a s3 URL, " + + "or by setting the " + + "fs.s3.awsAccessKeyId or " + + "fs.s3.awsSecretAccessKey properties (respectively)."); + } else if (accessKey == null) { + throw new IllegalArgumentException("AWS " + + "Access Key ID must be specified " + + "as the username of a s3 URL, or by setting the " + + "fs.s3.awsAccessKeyId property."); + } else if (secretAccessKey == null) { + throw new IllegalArgumentException("AWS " + + "Secret Access Key must be specified " + + "as the password of a s3 URL, or by setting the " + + "fs.s3.awsSecretAccessKey property."); + } + AWSCredentials awsCredentials = new AWSCredentials(accessKey, secretAccessKey); + this.s3Service = new RestS3Service(awsCredentials); + } catch (S3ServiceException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + bucket = new S3Bucket(uri.getHost()); + + createBucket(bucket.getName()); + } + + private void createBucket(String bucketName) throws IOException { + try { + s3Service.createBucket(bucketName); + } catch (S3ServiceException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + } + + private void delete(String key) throws IOException { + try { + s3Service.deleteObject(bucket, key); + } catch (S3ServiceException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + } + + public void deleteINode(Path path) throws IOException { + delete(pathToKey(path)); + } + + public void deleteBlock(Block block) throws IOException { + delete(blockToKey(block)); + } + + public boolean inodeExists(Path path) throws IOException { + InputStream in = get(pathToKey(path)); + if (in == null) { + return false; + } + in.close(); + return true; + } + + public boolean blockExists(long blockId) throws IOException { + InputStream in = get(blockToKey(blockId)); + if (in == null) { + return false; + } + in.close(); + return true; + } + + private InputStream get(String key) throws IOException { + try { + S3Object object = s3Service.getObject(bucket, key); + return object.getDataInputStream(); + } catch (S3ServiceException e) { + if (e.getErrorCode().equals("NoSuchKey")) { + return null; + } + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + } + + private InputStream get(String key, long byteRangeStart) throws IOException { + try { + S3Object object = s3Service.getObject(bucket, key, null, null, null, + null, byteRangeStart, null); + return object.getDataInputStream(); + } catch (S3ServiceException e) { + if (e.getErrorCode().equals("NoSuchKey")) { + return null; + } + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + } + + public INode getINode(Path path) throws IOException { + return INode.deserialize(get(pathToKey(path))); + } + + public InputStream getBlockStream(Block block, long byteRangeStart) + throws IOException { + return get(blockToKey(block), byteRangeStart); + } + + public Set<Path> listSubPaths(Path path) throws IOException { + try { + String prefix = pathToKey(path); + if (!prefix.endsWith(PATH_DELIMITER)) { + prefix += PATH_DELIMITER; + } + S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER, 0); + Set<Path> prefixes = new TreeSet<Path>(); + for (int i = 0; i < objects.length; i++) { + prefixes.add(keyToPath(objects[i].getKey())); + } + prefixes.remove(path); + return prefixes; + } catch (S3ServiceException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + } + + private void put(String key, InputStream in, long length) throws IOException { + try { + S3Object object = new S3Object(key); + object.setDataInputStream(in); + object.setContentType("binary/octet-stream"); + object.setContentLength(length); + s3Service.putObject(bucket, object); + } catch (S3ServiceException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + } + + public void storeINode(Path path, INode inode) throws IOException { + put(pathToKey(path), inode.serialize(), inode.getSerializedLength()); + } + + public void storeBlock(Block block, InputStream in) throws IOException { + put(blockToKey(block), in, block.getLength()); + } + + private String pathToKey(Path path) { + if (!path.isAbsolute()) { + throw new IllegalArgumentException("Path must be absolute: " + path); + } + return urlEncode(path.toString()); + } + + private Path keyToPath(String key) { + return new Path(urlDecode(key)); + } + + private static String urlEncode(String s) { + try { + return URLEncoder.encode(s, "UTF-8"); + } catch (UnsupportedEncodingException e) { + // Should never happen since every implementation of the Java Platform + // is required to support UTF-8. + // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html + throw new IllegalStateException(e); + } + } + + private static String urlDecode(String s) { + try { + return URLDecoder.decode(s, "UTF-8"); + } catch (UnsupportedEncodingException e) { + // Should never happen since every implementation of the Java Platform + // is required to support UTF-8. + // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html + throw new IllegalStateException(e); + } + } + + private String blockToKey(long blockId) { + return BLOCK_PREFIX + blockId; + } + + private String blockToKey(Block block) { + return blockToKey(block.getId()); + } + + public void purge() throws IOException { + try { + S3Object[] objects = s3Service.listObjects(bucket); + for (int i = 0; i < objects.length; i++) { + s3Service.deleteObject(bucket, objects[i].getKey()); + } + } catch (S3ServiceException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + } + + public void dump() throws IOException { + StringBuilder sb = new StringBuilder("S3 Filesystem, "); + sb.append(bucket.getName()).append("\n"); + try { + S3Object[] objects = s3Service.listObjects(bucket, PATH_DELIMITER, null); + for (int i = 0; i < objects.length; i++) { + Path path = keyToPath(objects[i].getKey()); + sb.append(path).append("\n"); + INode m = getINode(path); + sb.append("\t").append(m.getFileType()).append("\n"); + if (m.getFileType() == FileType.DIRECTORY) { + continue; + } + for (int j = 0; j < m.getBlocks().length; j++) { + sb.append("\t").append(m.getBlocks()[j]).append("\n"); + } + } + } catch (S3ServiceException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + System.out.println(sb); + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,12 @@ +package org.apache.hadoop.fs.s3; + +/** + * Thrown if there is a problem communicating with Amazon S3. + */ +public class S3Exception extends RuntimeException { + + public S3Exception(Throwable t) { + super(t); + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,320 @@ +package org.apache.hadoop.fs.s3; + +import java.io.IOException; +import java.net.URI; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FSOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; + +/** + * <p> + * A [EMAIL PROTECTED] FileSystem} backed by <a href="http://aws.amazon.com/s3">Amazon S3</a>. + * </p> + * @author Tom White + */ +public class S3FileSystem extends FileSystem { + + private static final long DEFAULT_BLOCK_SIZE = 1 * 1024 * 1024; + + private URI uri; + + private FileSystemStore store; + + private FileSystem localFs; + + private Path workingDir = new Path("/user", System.getProperty("user.name")); + + public S3FileSystem() { + this(new Jets3tFileSystemStore()); + } + + public S3FileSystem(FileSystemStore store) { + this.store = store; + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + store.initialize(uri, conf); + setConf(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + this.localFs = get(URI.create("file:///"), conf); + } + + @Override + public String getName() { + return getUri().toString(); + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Override + public void setWorkingDirectory(Path dir) { + workingDir = makeAbsolute(dir); + } + + private Path makeAbsolute(Path path) { + if (path.isAbsolute()) { + return path; + } + return new Path(workingDir, path); + } + + @Override + public boolean exists(Path path) throws IOException { + return store.inodeExists(makeAbsolute(path)); + } + + @Override + public boolean mkdirs(Path path) throws IOException { + Path absolutePath = makeAbsolute(path); + INode inode = store.getINode(absolutePath); + if (inode == null) { + store.storeINode(path, INode.DIRECTORY_INODE); + } else if (inode.isFile()) { + throw new IOException(String.format( + "Can't make directory for path %s since it is a file.", path)); + } + Path parent = path.getParent(); + return (parent == null || mkdirs(parent)); + } + + @Override + public boolean isDirectory(Path path) throws IOException { + INode inode = store.getINode(makeAbsolute(path)); + if (inode == null) { + return false; + } + return inode.isDirectory(); + } + + @Override + public boolean isFile(Path path) throws IOException { + INode inode = store.getINode(makeAbsolute(path)); + if (inode == null) { + return false; + } + return inode.isFile(); + } + + private INode checkFile(Path path) throws IOException { + INode inode = store.getINode(makeAbsolute(path)); + if (inode == null) { + throw new IOException("No such file."); + } + if (inode.isDirectory()) { + throw new IOException("Path " + path + " is a directory."); + } + return inode; + } + + @Override + public Path[] listPathsRaw(Path path) throws IOException { + INode inode = store.getINode(makeAbsolute(path)); + if (inode == null) { + return null; + } else if (inode.isFile()) { + return new Path[] { path }; + } else { // directory + Set<Path> paths = store.listSubPaths(path); + return paths.toArray(new Path[0]); + } + } + + @Override + public FSOutputStream createRaw(Path file, boolean overwrite, + short replication, long blockSize) throws IOException { + + return createRaw(file, overwrite, replication, blockSize, null); + } + + @Override + public FSOutputStream createRaw(Path file, boolean overwrite, + short replication, long blockSize, Progressable progress) + throws IOException { + + if (!isDirectory(file.getParent())) { + throw new IOException("Cannot create file " + file + + " since parent directory does not exist."); + } + INode inode = store.getINode(makeAbsolute(file)); + if (inode != null) { + if (overwrite) { + deleteRaw(file); + } else { + throw new IOException("File already exists: " + file); + } + } + return new S3OutputStream(getConf(), store, makeAbsolute(file), + blockSize, progress); + } + + @Override + public FSInputStream openRaw(Path path) throws IOException { + INode inode = checkFile(path); + return new S3InputStream(getConf(), store, inode); + } + + @Override + public boolean renameRaw(Path src, Path dst) throws IOException { + // TODO: Check corner cases: dst already exists, + // or if path is directory with children + Path absoluteSrc = makeAbsolute(src); + INode inode = store.getINode(absoluteSrc); + if (inode == null) { + throw new IOException("No such file."); + } + store.storeINode(makeAbsolute(dst), inode); + store.deleteINode(absoluteSrc); + return true; + } + + @Override + public boolean deleteRaw(Path path) throws IOException { + // TODO: Check if path is directory with children + Path absolutePath = makeAbsolute(path); + INode inode = store.getINode(absolutePath); + if (inode == null) { + throw new IOException("No such file or directory."); + } + store.deleteINode(absolutePath); + if (inode.isFile()) { + for (Block block : inode.getBlocks()) { + store.deleteBlock(block); + } + } + return true; + } + + @Override + public long getLength(Path path) throws IOException { + INode inode = checkFile(path); + long length = 0; + for (Block block : inode.getBlocks()) { + length += block.getLength(); + } + return length; + } + + /** + * Replication is not supported for S3 file systems since S3 handles it for + * us. + */ + @Override + public short getReplication(Path path) throws IOException { + return 1; + } + + @Override + public short getDefaultReplication() { + return 1; + } + + /** + * Replication is not supported for S3 file systems since S3 handles it for + * us. + */ + @Override + public boolean setReplicationRaw(Path path, short replication) + throws IOException { + return true; + } + + @Override + public long getBlockSize(Path path) throws IOException { + INode inode = store.getINode(makeAbsolute(path)); + if (inode == null) { + throw new IOException("No such file or directory."); + } + Block[] blocks = inode.getBlocks(); + if (blocks == null || blocks.length == 0) { + return 0; + } + return blocks[0].getLength(); + } + + @Override + public long getDefaultBlockSize() { + return getConf().getLong("fs.s3.block.size", DEFAULT_BLOCK_SIZE); + } + + /** + * Return 1x1 'localhost' cell if the file exists. Return null if otherwise. + */ + @Override + public String[][] getFileCacheHints(Path f, long start, long len) + throws IOException { + // TODO: Check this is the correct behavior + if (!exists(f)) { + return null; + } + return new String[][] { { "localhost" } }; + } + + @Override + public void lock(Path path, boolean shared) throws IOException { + // TODO: Design and implement + } + + @Override + public void release(Path path) throws IOException { + // TODO: Design and implement + } + + @Override + public void reportChecksumFailure(Path path, FSInputStream in, + long start, long length, int crc) { + // TODO: What to do here? + } + + @Override + public void moveFromLocalFile(Path src, Path dst) throws IOException { + FileUtil.copy(localFs, src, this, dst, true, getConf()); + } + + @Override + public void copyFromLocalFile(Path src, Path dst) throws IOException { + FileUtil.copy(localFs, src, this, dst, false, true, getConf()); + } + + @Override + public void copyToLocalFile(Path src, Path dst, boolean copyCrc) throws IOException { + FileUtil.copy(this, src, localFs, dst, false, copyCrc, getConf()); + } + + @Override + public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) + throws IOException { + return tmpLocalFile; + } + + @Override + public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) + throws IOException { + moveFromLocalFile(tmpLocalFile, fsOutputFile); + } + + // diagnostic methods + + void dump() throws IOException { + store.dump(); + } + + void purge() throws IOException { + store.purge(); + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,176 @@ +package org.apache.hadoop.fs.s3; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; + +class S3InputStream extends FSInputStream { + + private int bufferSize; + + private FileSystemStore store; + + private Block[] blocks; + + private boolean closed; + + private long fileLength; + + private long pos = 0; + + private DataInputStream blockStream; + + private long blockEnd = -1; + + public S3InputStream(Configuration conf, FileSystemStore store, + INode inode) { + + this.store = store; + this.blocks = inode.getBlocks(); + for (Block block : blocks) { + this.fileLength += block.getLength(); + } + this.bufferSize = conf.getInt("io.file.buffer.size", 4096); + } + + @Override + public synchronized long getPos() throws IOException { + return pos; + } + + @Override + public synchronized int available() throws IOException { + return (int) (fileLength - pos); + } + + @Override + public synchronized void seek(long targetPos) throws IOException { + if (targetPos > fileLength) { + throw new IOException("Cannot seek after EOF"); + } + pos = targetPos; + blockEnd = -1; + } + + @Override + public synchronized int read() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + int result = -1; + if (pos < fileLength) { + if (pos > blockEnd) { + blockSeekTo(pos); + } + result = blockStream.read(); + if (result >= 0) { + pos++; + } + } + return result; + } + + @Override + public synchronized int read(byte buf[], int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + if (pos < fileLength) { + if (pos > blockEnd) { + blockSeekTo(pos); + } + int realLen = Math.min(len, (int) (blockEnd - pos + 1)); + int result = blockStream.read(buf, off, realLen); + if (result >= 0) { + pos += result; + } + return result; + } + return -1; + } + + private synchronized void blockSeekTo(long target) throws IOException { + // + // Compute desired block + // + int targetBlock = -1; + long targetBlockStart = 0; + long targetBlockEnd = 0; + for (int i = 0; i < blocks.length; i++) { + long blockLength = blocks[i].getLength(); + targetBlockEnd = targetBlockStart + blockLength - 1; + + if (target >= targetBlockStart && target <= targetBlockEnd) { + targetBlock = i; + break; + } else { + targetBlockStart = targetBlockEnd + 1; + } + } + if (targetBlock < 0) { + throw new IOException( + "Impossible situation: could not find target position " + target); + } + long offsetIntoBlock = target - targetBlockStart; + + // read block blocks[targetBlock] from position offsetIntoBlock + + File fileBlock = File.createTempFile("s3fs-in", ""); + fileBlock.deleteOnExit(); + InputStream in = store.getBlockStream(blocks[targetBlock], offsetIntoBlock); + OutputStream out = new BufferedOutputStream(new FileOutputStream(fileBlock)); + byte[] buf = new byte[bufferSize]; + int numRead; + while ((numRead = in.read(buf)) >= 0) { + out.write(buf, 0, numRead); + } + out.close(); + in.close(); + + this.pos = target; + this.blockEnd = targetBlockEnd; + this.blockStream = new DataInputStream(new FileInputStream(fileBlock)); + + } + + @Override + public void close() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + if (blockStream != null) { + blockStream.close(); + blockStream.close(); + blockStream = null; + } + super.close(); + closed = true; + } + + /** + * We don't support marks. + */ + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readLimit) { + // Do nothing + } + + @Override + public void reset() throws IOException { + throw new IOException("Mark not supported"); + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,199 @@ +package org.apache.hadoop.fs.s3; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3.INode.FileType; +import org.apache.hadoop.util.Progressable; + +class S3OutputStream extends FSOutputStream { + + private int bufferSize; + + private FileSystemStore store; + + private Path path; + + private long blockSize; + + private File backupFile; + + private OutputStream backupStream; + + private Random r = new Random(); + + private boolean closed; + + private int pos = 0; + + private long filePos = 0; + + private int bytesWrittenToBlock = 0; + + private byte[] outBuf; + + private List<Block> blocks = new ArrayList<Block>(); + + private Block nextBlock; + + public S3OutputStream(Configuration conf, FileSystemStore store, + Path path, long blockSize, Progressable progress) throws IOException { + + this.store = store; + this.path = path; + this.blockSize = blockSize; + this.backupFile = newBackupFile(); + this.backupStream = new FileOutputStream(backupFile); + this.bufferSize = conf.getInt("io.file.buffer.size", 4096); + this.outBuf = new byte[bufferSize]; + + } + + private File newBackupFile() throws IOException { + File result = File.createTempFile("s3fs-out", ""); + result.deleteOnExit(); + return result; + } + + @Override + public long getPos() throws IOException { + return filePos; + } + + @Override + public synchronized void write(int b) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) { + flush(); + } + outBuf[pos++] = (byte) b; + filePos++; + } + + @Override + public synchronized void write(byte b[], int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + while (len > 0) { + int remaining = bufferSize - pos; + int toWrite = Math.min(remaining, len); + System.arraycopy(b, off, outBuf, pos, toWrite); + pos += toWrite; + off += toWrite; + len -= toWrite; + filePos += toWrite; + + if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) { + flush(); + } + } + } + + @Override + public synchronized void flush() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + if (bytesWrittenToBlock + pos >= blockSize) { + flushData((int) blockSize - bytesWrittenToBlock); + } + if (bytesWrittenToBlock == blockSize) { + endBlock(); + } + flushData(pos); + } + + private synchronized void flushData(int maxPos) throws IOException { + int workingPos = Math.min(pos, maxPos); + + if (workingPos > 0) { + // + // To the local block backup, write just the bytes + // + backupStream.write(outBuf, 0, workingPos); + + // + // Track position + // + bytesWrittenToBlock += workingPos; + System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos); + pos -= workingPos; + } + } + + private synchronized void endBlock() throws IOException { + // + // Done with local copy + // + backupStream.close(); + + // + // Send it to S3 + // + // TODO: Use passed in Progressable to report progress. + nextBlockOutputStream(); + InputStream in = new FileInputStream(backupFile); + store.storeBlock(nextBlock, in); + in.close(); + internalClose(); + + // + // Delete local backup, start new one + // + backupFile.delete(); + backupFile = newBackupFile(); + backupStream = new FileOutputStream(backupFile); + bytesWrittenToBlock = 0; + } + + private synchronized void nextBlockOutputStream() throws IOException { + long blockId = r.nextLong(); + while (store.blockExists(blockId)) { + blockId = r.nextLong(); + } + nextBlock = new Block(blockId, bytesWrittenToBlock); + blocks.add(nextBlock); + bytesWrittenToBlock = 0; + } + + private synchronized void internalClose() throws IOException { + INode inode = new INode(FileType.FILE, blocks.toArray(new Block[blocks + .size()])); + store.storeINode(path, inode); + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + flush(); + if (filePos == 0 || bytesWrittenToBlock != 0) { + endBlock(); + } + + backupStream.close(); + backupFile.delete(); + + super.close(); + + closed = true; + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html Wed Dec 13 15:12:07 2006 @@ -0,0 +1,34 @@ +<html> +<body> + +<p>A distributed implementation of [EMAIL PROTECTED] +org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>.</p> + +<p> +Files are stored in S3 as blocks (represented by [EMAIL PROTECTED] Block}), which have an ID and a length. +Block metadata is stored in S3 as a small record (represented by [EMAIL PROTECTED] INode}) using the URL-encoded +path string as a key. Inodes record the file type (regular file or directory) and the list of blocks. +This design makes it easy to seek to any given position in a file by reading the inode data to compute +which block to access, then using S3's support for +<a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.2">HTTP Range</a> headers +to start streaming from the correct position. +Renames are also efficient since only the inode is moved (by a DELETE followed by a PUT since +S3 does not support renames). +</p> +<p> +For a single file <i>/dir1/file1</i> which takes two blocks of storage, the file structure in S3 +would be something like this: +</p> +<pre> +/ +/dir1 +/dir1/file1 +block-6415776850131549260 +block-3026438247347758425 +</pre> +<p> +Inodes start with a leading <code>/</code>, while blocks are prefixed with <code>block-</code>. +</p> + +</body> +</html> Modified: lucene/hadoop/trunk/src/test/hadoop-site.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/hadoop-site.xml?view=diff&rev=486886&r1=486885&r2=486886 ============================================================================== --- lucene/hadoop/trunk/src/test/hadoop-site.xml (original) +++ lucene/hadoop/trunk/src/test/hadoop-site.xml Wed Dec 13 15:12:07 2006 @@ -14,5 +14,16 @@ <description>A base for other temporary directories.</description> </property> +<property> + <name>test.fs.s3.name</name> + <value>s3:///</value> + <description>The name of the s3 file system for testing.</description> +</property> + +<property> + <name>fs.s3.block.size</name> + <value>128</value> + <description>Size of a block in bytes.</description> +</property> </configuration> Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,108 @@ +package org.apache.hadoop.fs.s3; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3.INode.FileType; + +/** + * A stub implementation of [EMAIL PROTECTED] FileSystemStore} for testing + * [EMAIL PROTECTED] S3FileSystem} without actually connecting to S3. + * @author Tom White + */ +class InMemoryFileSystemStore implements FileSystemStore { + + private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>(); + private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>(); + + public void initialize(URI uri, Configuration conf) { + // Nothing to initialize + } + + public void deleteINode(Path path) throws IOException { + inodes.remove(path); + } + + public void deleteBlock(Block block) throws IOException { + blocks.remove(block.getId()); + } + + public boolean inodeExists(Path path) throws IOException { + return inodes.containsKey(path); + } + + public boolean blockExists(long blockId) throws IOException { + return blocks.containsKey(blockId); + } + + public INode getINode(Path path) throws IOException { + return inodes.get(path); + } + + public InputStream getBlockStream(Block block, long byteRangeStart) throws IOException { + byte[] data = blocks.get(block.getId()); + return new ByteArrayInputStream(data, (int) byteRangeStart, data.length - (int) byteRangeStart); + } + + public Set<Path> listSubPaths(Path path) throws IOException { + // This is inefficient but more than adequate for testing purposes. + Set<Path> subPaths = new LinkedHashSet<Path>(); + for (Path p : inodes.tailMap(path).keySet()) { + if (path.equals(p.getParent())) { + subPaths.add(p); + } + } + return subPaths; + } + + public void storeINode(Path path, INode inode) throws IOException { + inodes.put(path, inode); + } + + public void storeBlock(Block block, InputStream in) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buf = new byte[8192]; + int numRead; + while ((numRead = in.read(buf)) >= 0) { + out.write(buf, 0, numRead); + } + blocks.put(block.getId(), out.toByteArray()); + } + + public void purge() throws IOException { + inodes.clear(); + blocks.clear(); + } + + public void dump() throws IOException { + StringBuilder sb = new StringBuilder(getClass().getSimpleName()); + sb.append(", \n"); + for (Map.Entry<Path, INode> entry : inodes.entrySet()) { + sb.append(entry.getKey()).append("\n"); + INode inode = entry.getValue(); + sb.append("\t").append(inode.getFileType()).append("\n"); + if (inode.getFileType() == FileType.DIRECTORY) { + continue; + } + for (int j = 0; j < inode.getBlocks().length; j++) { + sb.append("\t").append(inode.getBlocks()[j]).append("\n"); + } + } + System.out.println(sb); + + System.out.println(inodes.keySet()); + System.out.println(blocks.keySet()); + } + +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,12 @@ +package org.apache.hadoop.fs.s3; + +import java.io.IOException; + +public class Jets3tS3FileSystemTest extends S3FileSystemBaseTest { + + @Override + public FileSystemStore getFileSystemStore() throws IOException { + return new Jets3tFileSystemStore(); + } + +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,232 @@ +package org.apache.hadoop.fs.s3; + +import java.io.IOException; +import java.net.URI; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FSOutputStream; +import org.apache.hadoop.fs.Path; + +public abstract class S3FileSystemBaseTest extends TestCase { + + private static final int BLOCK_SIZE = 128; + + private S3FileSystem s3FileSystem; + + private byte[] data; + + abstract FileSystemStore getFileSystemStore() throws IOException; + + @Override + protected void setUp() throws IOException { + Configuration conf = new Configuration(); + + s3FileSystem = new S3FileSystem(getFileSystemStore()); + s3FileSystem.initialize(URI.create(conf.get("test.fs.s3.name")), conf); + + data = new byte[BLOCK_SIZE * 2]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) (i % 10); + } + } + + @Override + protected void tearDown() throws Exception { + s3FileSystem.purge(); + s3FileSystem.close(); + } + + public void testWorkingDirectory() throws Exception { + + Path homeDir = new Path("/user/", System.getProperty("user.name")); + assertEquals(homeDir, s3FileSystem.getWorkingDirectory()); + + s3FileSystem.setWorkingDirectory(new Path(".")); + assertEquals(homeDir, s3FileSystem.getWorkingDirectory()); + + s3FileSystem.setWorkingDirectory(new Path("..")); + assertEquals(new Path("/user/"), s3FileSystem.getWorkingDirectory()); + + s3FileSystem.setWorkingDirectory(new Path("hadoop")); + assertEquals(new Path("/user/hadoop"), s3FileSystem.getWorkingDirectory()); + + s3FileSystem.setWorkingDirectory(new Path("/test/hadoop")); + assertEquals(new Path("/test/hadoop"), s3FileSystem.getWorkingDirectory()); + + } + + public void testMkdirs() throws Exception { + Path testDir = new Path("/test/hadoop"); + assertFalse(s3FileSystem.exists(testDir)); + assertFalse(s3FileSystem.isDirectory(testDir)); + assertFalse(s3FileSystem.isFile(testDir)); + + assertTrue(s3FileSystem.mkdirs(testDir)); + + assertTrue(s3FileSystem.exists(testDir)); + assertTrue(s3FileSystem.isDirectory(testDir)); + assertFalse(s3FileSystem.isFile(testDir)); + + Path parentDir = testDir.getParent(); + assertTrue(s3FileSystem.exists(parentDir)); + assertTrue(s3FileSystem.isDirectory(parentDir)); + assertFalse(s3FileSystem.isFile(parentDir)); + + Path grandparentDir = parentDir.getParent(); + assertTrue(s3FileSystem.exists(grandparentDir)); + assertTrue(s3FileSystem.isDirectory(grandparentDir)); + assertFalse(s3FileSystem.isFile(grandparentDir)); + } + + public void testListPathsRaw() throws Exception { + Path[] testDirs = { new Path("/test/hadoop/a"), new Path("/test/hadoop/b"), + new Path("/test/hadoop/c/1"), }; + assertNull(s3FileSystem.listPathsRaw(testDirs[0])); + + for (Path path : testDirs) { + assertTrue(s3FileSystem.mkdirs(path)); + } + + Path[] paths = s3FileSystem.listPathsRaw(new Path("/")); + + assertEquals(1, paths.length); + assertEquals(new Path("/test"), paths[0]); + + paths = s3FileSystem.listPathsRaw(new Path("/test")); + assertEquals(1, paths.length); + assertEquals(new Path("/test/hadoop"), paths[0]); + + paths = s3FileSystem.listPathsRaw(new Path("/test/hadoop")); + assertEquals(3, paths.length); + assertEquals(new Path("/test/hadoop/a"), paths[0]); + assertEquals(new Path("/test/hadoop/b"), paths[1]); + assertEquals(new Path("/test/hadoop/c"), paths[2]); + + paths = s3FileSystem.listPathsRaw(new Path("/test/hadoop/a")); + assertEquals(0, paths.length); + } + + public void testWriteReadAndDeleteEmptyFile() throws Exception { + writeReadAndDelete(0); + } + + public void testWriteReadAndDeleteHalfABlock() throws Exception { + writeReadAndDelete(BLOCK_SIZE / 2); + } + + public void testWriteReadAndDeleteOneBlock() throws Exception { + writeReadAndDelete(BLOCK_SIZE); + } + + public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception { + writeReadAndDelete(BLOCK_SIZE + BLOCK_SIZE / 2); + } + + public void testWriteReadAndDeleteTwoBlocks() throws Exception { + writeReadAndDelete(BLOCK_SIZE * 2); + } + + + private void writeReadAndDelete(int len) throws IOException { + Path path = new Path("/test/hadoop/file"); + + s3FileSystem.mkdirs(path.getParent()); + + FSOutputStream out = s3FileSystem.createRaw(path, false, (short) 1, BLOCK_SIZE); + out.write(data, 0, len); + out.close(); + + assertTrue("Exists", s3FileSystem.exists(path)); + + assertEquals("Block size", Math.min(len, BLOCK_SIZE), s3FileSystem.getBlockSize(path)); + + assertEquals("Length", len, s3FileSystem.getLength(path)); + + FSInputStream in = s3FileSystem.openRaw(path); + byte[] buf = new byte[len]; + + in.readFully(0, buf); + + assertEquals(len, buf.length); + for (int i = 0; i < buf.length; i++) { + assertEquals("Position " + i, data[i], buf[i]); + } + + assertTrue("Deleted", s3FileSystem.deleteRaw(path)); + + assertFalse("No longer exists", s3FileSystem.exists(path)); + + } + + public void testOverwrite() throws IOException { + Path path = new Path("/test/hadoop/file"); + + s3FileSystem.mkdirs(path.getParent()); + + FSOutputStream out = s3FileSystem.createRaw(path, false, (short) 1, BLOCK_SIZE); + out.write(data, 0, BLOCK_SIZE); + out.close(); + + assertTrue("Exists", s3FileSystem.exists(path)); + assertEquals("Length", BLOCK_SIZE, s3FileSystem.getLength(path)); + + try { + s3FileSystem.createRaw(path, false, (short) 1, 128); + fail("Should throw IOException."); + } catch (IOException e) { + // Expected + } + + out = s3FileSystem.createRaw(path, true, (short) 1, BLOCK_SIZE); + out.write(data, 0, BLOCK_SIZE / 2); + out.close(); + + assertTrue("Exists", s3FileSystem.exists(path)); + assertEquals("Length", BLOCK_SIZE / 2, s3FileSystem.getLength(path)); + + } + + public void testWriteInNonExistentDirectory() { + Path path = new Path("/test/hadoop/file"); + try { + s3FileSystem.createRaw(path, false, (short) 1, 128); + fail("Should throw IOException."); + } catch (IOException e) { + // Expected + } + } + + public void testRename() throws Exception { + int len = BLOCK_SIZE; + + Path path = new Path("/test/hadoop/file"); + + s3FileSystem.mkdirs(path.getParent()); + + FSOutputStream out = s3FileSystem.createRaw(path, false, (short) 1, BLOCK_SIZE); + out.write(data, 0, len); + out.close(); + + assertTrue("Exists", s3FileSystem.exists(path)); + + Path newPath = new Path("/test/hadoop/newfile"); + s3FileSystem.rename(path, newPath); + assertFalse("No longer exists", s3FileSystem.exists(path)); + assertTrue("Moved", s3FileSystem.exists(newPath)); + + FSInputStream in = s3FileSystem.openRaw(newPath); + byte[] buf = new byte[len]; + + in.readFully(0, buf); + + assertEquals(len, buf.length); + for (int i = 0; i < buf.length; i++) { + assertEquals("Position " + i, data[i], buf[i]); + } + } + + +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,42 @@ +package org.apache.hadoop.fs.s3; + +import java.io.IOException; +import java.io.InputStream; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.s3.INode.FileType; + +public class TestINode extends TestCase { + + public void testSerializeFileWithSingleBlock() throws IOException { + Block[] blocks = { new Block(849282477840258181L, 128L) }; + INode inode = new INode(FileType.FILE, blocks); + + assertEquals("Length", 1L + 4 + 16, inode.getSerializedLength()); + InputStream in = inode.serialize(); + + INode deserialized = INode.deserialize(in); + + assertEquals("FileType", inode.getFileType(), deserialized.getFileType()); + Block[] deserializedBlocks = deserialized.getBlocks(); + assertEquals("Length", 1, deserializedBlocks.length); + assertEquals("Id", blocks[0].getId(), deserializedBlocks[0].getId()); + assertEquals("Length", blocks[0].getLength(), deserializedBlocks[0] + .getLength()); + + } + + public void testSerializeDirectory() throws IOException { + INode inode = INode.DIRECTORY_INODE; + assertEquals("Length", 1L, inode.getSerializedLength()); + InputStream in = inode.serialize(); + INode deserialized = INode.deserialize(in); + assertSame(INode.DIRECTORY_INODE, deserialized); + } + + public void testDeserializeNull() throws IOException { + assertNull(INode.deserialize(null)); + } + +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java?view=auto&rev=486886 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java Wed Dec 13 15:12:07 2006 @@ -0,0 +1,33 @@ +package org.apache.hadoop.fs.s3; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; + +public class TestInMemoryS3FileSystem extends S3FileSystemBaseTest { + + @Override + public FileSystemStore getFileSystemStore() throws IOException { + return new InMemoryFileSystemStore(); + } + + public void testInitialization() throws IOException { + initializationTest("s3://a:[EMAIL PROTECTED]", "s3://a:[EMAIL PROTECTED]"); + initializationTest("s3://a:[EMAIL PROTECTED]/", "s3://a:[EMAIL PROTECTED]"); + initializationTest("s3://a:[EMAIL PROTECTED]/path", "s3://a:[EMAIL PROTECTED]"); + initializationTest("s3://[EMAIL PROTECTED]", "s3://[EMAIL PROTECTED]"); + initializationTest("s3://[EMAIL PROTECTED]/", "s3://[EMAIL PROTECTED]"); + initializationTest("s3://[EMAIL PROTECTED]/path", "s3://[EMAIL PROTECTED]"); + initializationTest("s3://c", "s3://c"); + initializationTest("s3://c/", "s3://c"); + initializationTest("s3://c/path", "s3://c"); + } + + private void initializationTest(String initializationUri, String expectedUri) throws IOException { + S3FileSystem fs = new S3FileSystem(getFileSystemStore()); + fs.initialize(URI.create(initializationUri), new Configuration()); + assertEquals(URI.create(expectedUri), fs.getUri()); + } + +}