Author: stevel Date: Thu Jul 3 12:44:24 2014 New Revision: 1607613 URL: http://svn.apache.org/r1607613 Log: HADOOP-9361: Strictly define FileSystem APIs
Added: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java - copied unchanged from r1607596, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/site/markdown/ - copied from r1607596, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/site/markdown/ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ - copied from r1607596, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/test/resources/contract/ - copied from r1607596, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/resources/contract/ Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/ (props changed) hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/ (props changed) hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/ (props changed) hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/ (props changed) hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPInputStream.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java Propchange: hadoop/common/branches/branch-2.5/hadoop-common-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project:r1607596 Propchange: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1607596 Propchange: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src:r1607596 Propchange: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1607596 Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java Thu Jul 3 12:44:24 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.fs; import java.io.BufferedInputStream; +import java.io.EOFException; import java.io.FileDescriptor; import java.io.IOException; @@ -51,6 +52,9 @@ implements Seekable, PositionedReadable, @Override public long getPos() throws IOException { + if (in == null) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } return ((FSInputStream)in).getPos()-(count-pos); } @@ -66,8 +70,11 @@ implements Seekable, PositionedReadable, @Override public void seek(long pos) throws IOException { - if( pos<0 ) { - return; + if (in == null) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + if (pos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } if (this.pos != this.count) { // optimize: check if the pos is in the buffer Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java Thu Jul 3 12:44:24 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.fs; +import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -318,8 +319,8 @@ public abstract class ChecksumFileSystem @Override public synchronized void seek(long pos) throws IOException { - if(pos>getFileLength()) { - throw new IOException("Cannot seek after EOF"); + if (pos > getFileLength()) { + throw new EOFException("Cannot seek after EOF"); } super.seek(pos); } Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java Thu Jul 3 12:44:24 2014 @@ -67,7 +67,10 @@ public class FSDataOutputStream extends } public void close() throws IOException { - out.close(); + // ensure close works even if a null reference was passed in + if (out != null) { + out.close(); + } } } Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputChecker.java Thu Jul 3 12:44:24 2014 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.fs; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.zip.Checksum; @@ -394,8 +395,8 @@ abstract public class FSInputChecker ext @Override public synchronized void seek(long pos) throws IOException { - if( pos<0 ) { - return; + if( pos < 0 ) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } // optimize: check if the pos is in the buffer long start = chunkPos - this.count; Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java Thu Jul 3 12:44:24 2014 @@ -23,6 +23,7 @@ import com.google.common.annotations.Vis import java.io.BufferedOutputStream; import java.io.DataOutput; +import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -105,6 +106,10 @@ public class RawLocalFileSystem extends @Override public void seek(long pos) throws IOException { + if (pos < 0) { + throw new EOFException( + FSExceptionMessages.NEGATIVE_SEEK); + } fis.getChannel().position(pos); this.position = pos; } @@ -256,7 +261,7 @@ public class RawLocalFileSystem extends boolean createParent, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { if (exists(f) && !overwrite) { - throw new IOException("File already exists: "+f); + throw new FileAlreadyExistsException("File already exists: " + f); } Path parent = f.getParent(); if (parent != null && !mkdirs(parent)) { @@ -272,7 +277,7 @@ public class RawLocalFileSystem extends EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { if (exists(f) && !flags.contains(CreateFlag.OVERWRITE)) { - throw new IOException("File already exists: "+f); + throw new FileAlreadyExistsException("File already exists: " + f); } return new FSDataOutputStream(new BufferedOutputStream( new LocalFSFileOutputStream(f, false), bufferSize), statistics); @@ -344,6 +349,10 @@ public class RawLocalFileSystem extends @Override public boolean delete(Path p, boolean recursive) throws IOException { File f = pathToFile(p); + if (!f.exists()) { + //no path, return false "nothing to delete" + return false; + } if (f.isFile()) { return f.delete(); } else if (!recursive && f.isDirectory() && @@ -406,10 +415,14 @@ public class RawLocalFileSystem extends if(parent != null) { File parent2f = pathToFile(parent); if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) { - throw new FileAlreadyExistsException("Parent path is not a directory: " + throw new ParentNotDirectoryException("Parent path is not a directory: " + parent); } } + if (p2f.exists() && !p2f.isDirectory()) { + throw new FileNotFoundException("Destination exists" + + " and is not a directory: " + p2f.getCanonicalPath()); + } return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory()); } Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java Thu Jul 3 12:44:24 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.fs.ftp; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.net.ConnectException; import java.net.URI; import org.apache.commons.logging.Log; @@ -33,11 +34,14 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Progressable; /** @@ -56,6 +60,12 @@ public class FTPFileSystem extends FileS public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; public static final int DEFAULT_BLOCK_SIZE = 4 * 1024; + public static final String FS_FTP_USER_PREFIX = "fs.ftp.user."; + public static final String FS_FTP_HOST = "fs.ftp.host"; + public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port"; + public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password."; + public static final String E_SAME_DIRECTORY_ONLY = + "only same directory renames are supported"; private URI uri; @@ -75,11 +85,11 @@ public class FTPFileSystem extends FileS super.initialize(uri, conf); // get host information from uri (overrides info in conf) String host = uri.getHost(); - host = (host == null) ? conf.get("fs.ftp.host", null) : host; + host = (host == null) ? conf.get(FS_FTP_HOST, null) : host; if (host == null) { throw new IOException("Invalid host specified"); } - conf.set("fs.ftp.host", host); + conf.set(FS_FTP_HOST, host); // get port information from uri, (overrides info in conf) int port = uri.getPort(); @@ -96,11 +106,11 @@ public class FTPFileSystem extends FileS } } String[] userPasswdInfo = userAndPassword.split(":"); - conf.set("fs.ftp.user." + host, userPasswdInfo[0]); + conf.set(FS_FTP_USER_PREFIX + host, userPasswdInfo[0]); if (userPasswdInfo.length > 1) { - conf.set("fs.ftp.password." + host, userPasswdInfo[1]); + conf.set(FS_FTP_PASSWORD_PREFIX + host, userPasswdInfo[1]); } else { - conf.set("fs.ftp.password." + host, null); + conf.set(FS_FTP_PASSWORD_PREFIX + host, null); } setConf(conf); this.uri = uri; @@ -115,23 +125,24 @@ public class FTPFileSystem extends FileS private FTPClient connect() throws IOException { FTPClient client = null; Configuration conf = getConf(); - String host = conf.get("fs.ftp.host"); - int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT); - String user = conf.get("fs.ftp.user." + host); - String password = conf.get("fs.ftp.password." + host); + String host = conf.get(FS_FTP_HOST); + int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT); + String user = conf.get(FS_FTP_USER_PREFIX + host); + String password = conf.get(FS_FTP_PASSWORD_PREFIX + host); client = new FTPClient(); client.connect(host, port); int reply = client.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { - throw new IOException("Server - " + host - + " refused connection on port - " + port); + throw NetUtils.wrapException(host, port, + NetUtils.UNKNOWN_HOST, 0, + new ConnectException("Server response " + reply)); } else if (client.login(user, password)) { client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); client.setFileType(FTP.BINARY_FILE_TYPE); client.setBufferSize(DEFAULT_BUFFER_SIZE); } else { throw new IOException("Login failed on server - " + host + ", port - " - + port); + + port + " as user '" + user + "'"); } return client; @@ -179,7 +190,7 @@ public class FTPFileSystem extends FileS FileStatus fileStat = getFileStatus(client, absolute); if (fileStat.isDirectory()) { disconnect(client); - throw new IOException("Path " + file + " is a directory."); + throw new FileNotFoundException("Path " + file + " is a directory."); } client.allocate(bufferSize); Path parent = absolute.getParent(); @@ -214,12 +225,18 @@ public class FTPFileSystem extends FileS final FTPClient client = connect(); Path workDir = new Path(client.printWorkingDirectory()); Path absolute = makeAbsolute(workDir, file); - if (exists(client, file)) { - if (overwrite) { - delete(client, file); + FileStatus status; + try { + status = getFileStatus(client, file); + } catch (FileNotFoundException fnfe) { + status = null; + } + if (status != null) { + if (overwrite && !status.isDirectory()) { + delete(client, file, false); } else { disconnect(client); - throw new IOException("File already exists: " + file); + throw new FileAlreadyExistsException("File already exists: " + file); } } @@ -272,14 +289,13 @@ public class FTPFileSystem extends FileS * Convenience method, so that we don't open a new connection when using this * method from within another method. Otherwise every API invocation incurs * the overhead of opening/closing a TCP connection. + * @throws IOException on IO problems other than FileNotFoundException */ - private boolean exists(FTPClient client, Path file) { + private boolean exists(FTPClient client, Path file) throws IOException { try { return getFileStatus(client, file) != null; } catch (FileNotFoundException fnfe) { return false; - } catch (IOException ioe) { - throw new FTPException("Failed to get file status", ioe); } } @@ -294,12 +310,6 @@ public class FTPFileSystem extends FileS } } - /** @deprecated Use delete(Path, boolean) instead */ - @Deprecated - private boolean delete(FTPClient client, Path file) throws IOException { - return delete(client, file, false); - } - /** * Convenience method, so that we don't open a new connection when using this * method from within another method. Otherwise every API invocation incurs @@ -310,9 +320,14 @@ public class FTPFileSystem extends FileS Path workDir = new Path(client.printWorkingDirectory()); Path absolute = makeAbsolute(workDir, file); String pathName = absolute.toUri().getPath(); - FileStatus fileStat = getFileStatus(client, absolute); - if (fileStat.isFile()) { - return client.deleteFile(pathName); + try { + FileStatus fileStat = getFileStatus(client, absolute); + if (fileStat.isFile()) { + return client.deleteFile(pathName); + } + } catch (FileNotFoundException e) { + //the file is not there + return false; } FileStatus[] dirEntries = listStatus(client, absolute); if (dirEntries != null && dirEntries.length > 0 && !(recursive)) { @@ -491,7 +506,7 @@ public class FTPFileSystem extends FileS created = created && client.makeDirectory(pathName); } } else if (isFile(client, absolute)) { - throw new IOException(String.format( + throw new ParentNotDirectoryException(String.format( "Can't make directory for path %s since it is a file.", absolute)); } return created; @@ -528,6 +543,23 @@ public class FTPFileSystem extends FileS } /** + * Probe for a path being a parent of another + * @param parent parent path + * @param child possible child path + * @return true if the parent's path matches the start of the child's + */ + private boolean isParentOf(Path parent, Path child) { + URI parentURI = parent.toUri(); + String parentPath = parentURI.getPath(); + if (!parentPath.endsWith("/")) { + parentPath += "/"; + } + URI childURI = child.toUri(); + String childPath = childURI.getPath(); + return childPath.startsWith(parentPath); + } + + /** * Convenience method, so that we don't open a new connection when using this * method from within another method. Otherwise every API invocation incurs * the overhead of opening/closing a TCP connection. @@ -544,20 +576,31 @@ public class FTPFileSystem extends FileS Path absoluteSrc = makeAbsolute(workDir, src); Path absoluteDst = makeAbsolute(workDir, dst); if (!exists(client, absoluteSrc)) { - throw new IOException("Source path " + src + " does not exist"); + throw new FileNotFoundException("Source path " + src + " does not exist"); + } + if (isDirectory(absoluteDst)) { + // destination is a directory: rename goes underneath it with the + // source name + absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); } if (exists(client, absoluteDst)) { - throw new IOException("Destination path " + dst - + " already exist, cannot rename!"); + throw new FileAlreadyExistsException("Destination path " + dst + + " already exists"); } String parentSrc = absoluteSrc.getParent().toUri().toString(); String parentDst = absoluteDst.getParent().toUri().toString(); - String from = src.getName(); - String to = dst.getName(); + if (isParentOf(absoluteSrc, absoluteDst)) { + throw new IOException("Cannot rename " + absoluteSrc + " under itself" + + " : "+ absoluteDst); + } + if (!parentSrc.equals(parentDst)) { - throw new IOException("Cannot rename parent(source): " + parentSrc - + ", parent(destination): " + parentDst); + throw new IOException("Cannot rename source: " + absoluteSrc + + " to " + absoluteDst + + " -"+ E_SAME_DIRECTORY_ONLY); } + String from = absoluteSrc.getName(); + String to = absoluteDst.getName(); client.changeWorkingDirectory(parentSrc); boolean renamed = client.rename(from, to); return renamed; Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPInputStream.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPInputStream.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPInputStream.java Thu Jul 3 12:44:24 2014 @@ -103,7 +103,7 @@ public class FTPInputStream extends FSIn @Override public synchronized void close() throws IOException { if (closed) { - throw new IOException("Stream closed"); + return; } super.close(); closed = true; Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3/S3FileSystem.java Thu Jul 3 12:44:24 2014 @@ -32,6 +32,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -226,7 +227,7 @@ public class S3FileSystem extends FileSy if (overwrite) { delete(file, true); } else { - throw new IOException("File already exists: " + file); + throw new FileAlreadyExistsException("File already exists: " + file); } } else { Path parent = file.getParent(); Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java Thu Jul 3 12:44:24 2014 @@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.s3nat import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; +import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -32,17 +33,19 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.s3.S3Credentials; import org.apache.hadoop.fs.s3.S3Exception; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.AccessControlException; import org.jets3t.service.S3Service; import org.jets3t.service.S3ServiceException; import org.jets3t.service.ServiceException; import org.jets3t.service.StorageObjectsChunk; +import org.jets3t.service.impl.rest.HttpException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.MultipartPart; import org.jets3t.service.model.MultipartUpload; @@ -51,6 +54,8 @@ import org.jets3t.service.model.S3Object import org.jets3t.service.model.StorageObject; import org.jets3t.service.security.AWSCredentials; import org.jets3t.service.utils.MultipartUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -66,8 +71,8 @@ class Jets3tNativeFileSystemStore implem private String serverSideEncryptionAlgorithm; - public static final Log LOG = - LogFactory.getLog(Jets3tNativeFileSystemStore.class); + public static final Logger LOG = + LoggerFactory.getLogger(Jets3tNativeFileSystemStore.class); @Override public void initialize(URI uri, Configuration conf) throws IOException { @@ -79,7 +84,7 @@ class Jets3tNativeFileSystemStore implem s3Credentials.getSecretAccessKey()); this.s3Service = new RestS3Service(awsCredentials); } catch (S3ServiceException e) { - handleS3ServiceException(e); + handleException(e); } multipartEnabled = conf.getBoolean("fs.s3n.multipart.uploads.enabled", false); @@ -115,16 +120,10 @@ class Jets3tNativeFileSystemStore implem object.setMd5Hash(md5Hash); } s3Service.putObject(bucket, object); - } catch (S3ServiceException e) { - handleS3ServiceException(e); + } catch (ServiceException e) { + handleException(e, key); } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - // ignore - } - } + IOUtils.closeStream(in); } } @@ -147,10 +146,8 @@ class Jets3tNativeFileSystemStore implem try { mpUtils.uploadObjects(bucket.getName(), s3Service, objectsToUploadAsMultipart, null); - } catch (ServiceException e) { - handleServiceException(e); } catch (Exception e) { - throw new S3Exception(e); + handleException(e, key); } } @@ -163,8 +160,8 @@ class Jets3tNativeFileSystemStore implem object.setContentLength(0); object.setServerSideEncryptionAlgorithm(serverSideEncryptionAlgorithm); s3Service.putObject(bucket, object); - } catch (S3ServiceException e) { - handleS3ServiceException(e); + } catch (ServiceException e) { + handleException(e, key); } } @@ -172,20 +169,21 @@ class Jets3tNativeFileSystemStore implem public FileMetadata retrieveMetadata(String key) throws IOException { StorageObject object = null; try { - if(LOG.isDebugEnabled()) { - LOG.debug("Getting metadata for key: " + key + " from bucket:" + bucket.getName()); - } + LOG.debug("Getting metadata for key: {} from bucket: {}", + key, bucket.getName()); object = s3Service.getObjectDetails(bucket.getName(), key); return new FileMetadata(key, object.getContentLength(), object.getLastModifiedDate().getTime()); } catch (ServiceException e) { - // Following is brittle. Is there a better way? - if ("NoSuchKey".equals(e.getErrorCode())) { - return null; //return null if key not found + try { + // process + handleException(e, key); + return null; + } catch (FileNotFoundException fnfe) { + // and downgrade missing files + return null; } - handleServiceException(e); - return null; //never returned - keep compiler happy } finally { if (object != null) { object.closeDataInputStream(); @@ -204,13 +202,12 @@ class Jets3tNativeFileSystemStore implem @Override public InputStream retrieve(String key) throws IOException { try { - if(LOG.isDebugEnabled()) { - LOG.debug("Getting key: " + key + " from bucket:" + bucket.getName()); - } + LOG.debug("Getting key: {} from bucket: {}", + key, bucket.getName()); S3Object object = s3Service.getObject(bucket.getName(), key); return object.getDataInputStream(); } catch (ServiceException e) { - handleServiceException(key, e); + handleException(e, key); return null; //return null if key not found } } @@ -228,15 +225,14 @@ class Jets3tNativeFileSystemStore implem public InputStream retrieve(String key, long byteRangeStart) throws IOException { try { - if(LOG.isDebugEnabled()) { - LOG.debug("Getting key: " + key + " from bucket:" + bucket.getName() + " with byteRangeStart: " + byteRangeStart); - } + LOG.debug("Getting key: {} from bucket: {} with byteRangeStart: {}", + key, bucket.getName(), byteRangeStart); S3Object object = s3Service.getObject(bucket, key, null, null, null, null, byteRangeStart, null); return object.getDataInputStream(); } catch (ServiceException e) { - handleServiceException(key, e); - return null; //return null if key not found + handleException(e, key); + return null; } } @@ -254,17 +250,19 @@ class Jets3tNativeFileSystemStore implem } /** - * - * @return - * This method returns null if the list could not be populated - * due to S3 giving ServiceException - * @throws IOException + * list objects + * @param prefix prefix + * @param delimiter delimiter + * @param maxListingLength max no. of entries + * @param priorLastKey last key in any previous search + * @return a list of matches + * @throws IOException on any reported failure */ private PartialListing list(String prefix, String delimiter, int maxListingLength, String priorLastKey) throws IOException { try { - if (prefix.length() > 0 && !prefix.endsWith(PATH_DELIMITER)) { + if (!prefix.isEmpty() && !prefix.endsWith(PATH_DELIMITER)) { prefix += PATH_DELIMITER; } StorageObjectsChunk chunk = s3Service.listObjectsChunked(bucket.getName(), @@ -279,24 +277,20 @@ class Jets3tNativeFileSystemStore implem } return new PartialListing(chunk.getPriorLastKey(), fileMetadata, chunk.getCommonPrefixes()); - } catch (S3ServiceException e) { - handleS3ServiceException(e); - return null; //never returned - keep compiler happy } catch (ServiceException e) { - handleServiceException(e); - return null; //return null if list could not be populated + handleException(e, prefix); + return null; // never returned - keep compiler happy } } @Override public void delete(String key) throws IOException { try { - if(LOG.isDebugEnabled()) { - LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName()); - } + LOG.debug("Deleting key: {} from bucket: {}", + key, bucket.getName()); s3Service.deleteObject(bucket, key); } catch (ServiceException e) { - handleServiceException(key, e); + handleException(e, key); } } @@ -304,7 +298,7 @@ class Jets3tNativeFileSystemStore implem try { s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey)); } catch (ServiceException e) { - handleServiceException(e); + handleException(e, srcKey); } } @@ -329,7 +323,7 @@ class Jets3tNativeFileSystemStore implem s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(), dstObject, false); } catch (ServiceException e) { - handleServiceException(srcKey, e); + handleException(e, srcKey); } } @@ -364,19 +358,22 @@ class Jets3tNativeFileSystemStore implem Collections.reverse(listedParts); s3Service.multipartCompleteUpload(multipartUpload, listedParts); } catch (ServiceException e) { - handleServiceException(e); + handleException(e, srcObject.getKey()); } } @Override public void purge(String prefix) throws IOException { + String key = ""; try { - S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null); + S3Object[] objects = + s3Service.listObjects(bucket.getName(), prefix, null); for (S3Object object : objects) { - s3Service.deleteObject(bucket, object.getKey()); + key = object.getKey(); + s3Service.deleteObject(bucket, key); } } catch (S3ServiceException e) { - handleS3ServiceException(e); + handleException(e, key); } } @@ -390,39 +387,97 @@ class Jets3tNativeFileSystemStore implem sb.append(object.getKey()).append("\n"); } } catch (S3ServiceException e) { - handleS3ServiceException(e); + handleException(e); } System.out.println(sb); } - private void handleServiceException(String key, ServiceException e) throws IOException { - if ("NoSuchKey".equals(e.getErrorCode())) { - throw new FileNotFoundException("Key '" + key + "' does not exist in S3"); - } else { - handleServiceException(e); - } + /** + * Handle any service exception by translating it into an IOException + * @param e exception + * @throws IOException exception -always + */ + private void handleException(Exception e) throws IOException { + throw processException(e, e, ""); } + /** + * Handle any service exception by translating it into an IOException + * @param e exception + * @param key key sought from object store - private void handleS3ServiceException(S3ServiceException e) throws IOException { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - else { - if(LOG.isDebugEnabled()) { - LOG.debug("S3 Error code: " + e.getS3ErrorCode() + "; S3 Error message: " + e.getS3ErrorMessage()); - } - throw new S3Exception(e); - } + * @throws IOException exception -always + */ + private void handleException(Exception e, String key) throws IOException { + throw processException(e, e, key); } - private void handleServiceException(ServiceException e) throws IOException { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - else { - if(LOG.isDebugEnabled()) { - LOG.debug("Got ServiceException with Error code: " + e.getErrorCode() + ";and Error message: " + e.getErrorMessage()); - } + /** + * Handle any service exception by translating it into an IOException + * @param thrown exception + * @param original original exception -thrown if no other translation could + * be made + * @param key key sought from object store or "" for undefined + * @return an exception to throw. If isProcessingCause==true this may be null. + */ + private IOException processException(Throwable thrown, Throwable original, + String key) { + IOException result; + if (thrown.getCause() != null) { + // recurse down + result = processException(thrown.getCause(), original, key); + } else if (thrown instanceof HttpException) { + // nested HttpException - examine error code and react + HttpException httpException = (HttpException) thrown; + String responseMessage = httpException.getResponseMessage(); + int responseCode = httpException.getResponseCode(); + String bucketName = "s3n://" + bucket.getName(); + String text = String.format("%s : %03d : %s", + bucketName, + responseCode, + responseMessage); + String filename = !key.isEmpty() ? (bucketName + "/" + key) : text; + IOException ioe; + switch (responseCode) { + case 404: + result = new FileNotFoundException(filename); + break; + case 416: // invalid range + result = new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + +": " + filename); + break; + case 403: //forbidden + result = new AccessControlException("Permission denied" + +": " + filename); + break; + default: + result = new IOException(text); + } + result.initCause(thrown); + } else if (thrown instanceof S3ServiceException) { + S3ServiceException se = (S3ServiceException) thrown; + LOG.debug( + "S3ServiceException: {}: {} : {}", + se.getS3ErrorCode(), se.getS3ErrorMessage(), se, se); + if ("InvalidRange".equals(se.getS3ErrorCode())) { + result = new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } else { + result = new S3Exception(se); + } + } else if (thrown instanceof ServiceException) { + ServiceException se = (ServiceException) thrown; + LOG.debug("S3ServiceException: {}: {} : {}", + se.getErrorCode(), se.toString(), se, se); + result = new S3Exception(se); + } else if (thrown instanceof IOException) { + result = (IOException) thrown; + } else { + // here there is no exception derived yet. + // this means no inner cause, and no translation made yet. + // convert the original to an IOException -rather than just the + // exception at the base of the tree + result = new S3Exception(original); } + + return result; } } Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java Thu Jul 3 12:44:24 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3native; import java.io.BufferedOutputStream; +import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; @@ -37,15 +38,16 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BufferedFSInputStream; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -55,6 +57,8 @@ import org.apache.hadoop.io.retry.RetryP import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * <p> @@ -81,8 +85,8 @@ import org.apache.hadoop.util.Progressab @InterfaceStability.Stable public class NativeS3FileSystem extends FileSystem { - public static final Log LOG = - LogFactory.getLog(NativeS3FileSystem.class); + public static final Logger LOG = + LoggerFactory.getLogger(NativeS3FileSystem.class); private static final String FOLDER_SUFFIX = "_$folder$"; static final String PATH_DELIMITER = Path.SEPARATOR; @@ -97,6 +101,7 @@ public class NativeS3FileSystem extends private long pos = 0; public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { + Preconditions.checkNotNull(in, "Null input stream"); this.store = store; this.statistics = statistics; this.in = in; @@ -105,13 +110,20 @@ public class NativeS3FileSystem extends @Override public synchronized int read() throws IOException { - int result = -1; + int result; try { result = in.read(); } catch (IOException e) { - LOG.info("Received IOException while reading '" + key + "', attempting to reopen."); - seek(pos); - result = in.read(); + LOG.info("Received IOException while reading '{}', attempting to reopen", + key); + LOG.debug("{}", e, e); + try { + seek(pos); + result = in.read(); + } catch (EOFException eof) { + LOG.debug("EOF on input stream read: {}", eof, eof); + result = -1; + } } if (result != -1) { pos++; @@ -124,12 +136,17 @@ public class NativeS3FileSystem extends @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - + if (in == null) { + throw new EOFException("Cannot read closed stream"); + } int result = -1; try { result = in.read(b, off, len); + } catch (EOFException eof) { + throw eof; } catch (IOException e) { - LOG.info("Received IOException while reading '" + key + "', attempting to reopen."); + LOG.info( "Received IOException while reading '{}'," + + " attempting to reopen.", key); seek(pos); result = in.read(b, off, len); } @@ -143,17 +160,53 @@ public class NativeS3FileSystem extends } @Override - public void close() throws IOException { - in.close(); + public synchronized void close() throws IOException { + closeInnerStream(); + } + + /** + * Close the inner stream if not null. Even if an exception + * is raised during the close, the field is set to null + * @throws IOException if raised by the close() operation. + */ + private void closeInnerStream() throws IOException { + if (in != null) { + try { + in.close(); + } finally { + in = null; + } + } + } + + /** + * Update inner stream with a new stream and position + * @param newStream new stream -must not be null + * @param newpos new position + * @throws IOException IO exception on a failure to close the existing + * stream. + */ + private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException { + Preconditions.checkNotNull(newStream, "Null newstream argument"); + closeInnerStream(); + in = newStream; + this.pos = newpos; } @Override - public synchronized void seek(long pos) throws IOException { - in.close(); - LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'"); - in = store.retrieve(key, pos); - this.pos = pos; + public synchronized void seek(long newpos) throws IOException { + if (newpos < 0) { + throw new EOFException( + FSExceptionMessages.NEGATIVE_SEEK); + } + if (pos != newpos) { + // the seek is attempting to move the current position + LOG.debug("Opening key '{}' for reading at position '{}", key, newpos); + InputStream newStream = store.retrieve(key, newpos); + updateInnerStream(newStream, newpos); + } } + @Override public synchronized long getPos() throws IOException { return pos; @@ -214,7 +267,7 @@ public class NativeS3FileSystem extends } backupStream.close(); - LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload"); + LOG.info("OutputStream for key '{}' closed. Now beginning upload", key); try { byte[] md5Hash = digest == null ? null : digest.digest(); @@ -226,7 +279,7 @@ public class NativeS3FileSystem extends super.close(); closed = true; } - LOG.info("OutputStream for key '" + key + "' upload complete"); + LOG.info("OutputStream for key '{}' upload complete", key); } @Override @@ -339,7 +392,7 @@ public class NativeS3FileSystem extends Progressable progress) throws IOException { if (exists(f) && !overwrite) { - throw new IOException("File already exists:"+f); + throw new FileAlreadyExistsException("File already exists: " + f); } if(LOG.isDebugEnabled()) { @@ -367,7 +420,7 @@ public class NativeS3FileSystem extends String key = pathToKey(absolutePath); if (status.isDirectory()) { if (!recurse && listStatus(f).length > 0) { - throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false"); + throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false"); } createParent(f); @@ -538,7 +591,7 @@ public class NativeS3FileSystem extends try { FileStatus fileStatus = getFileStatus(f); if (fileStatus.isFile()) { - throw new IOException(String.format( + throw new FileAlreadyExistsException(String.format( "Can't make directory for path '%s' since it is a file.", f)); } @@ -556,7 +609,7 @@ public class NativeS3FileSystem extends public FSDataInputStream open(Path f, int bufferSize) throws IOException { FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist if (fs.isDirectory()) { - throw new IOException("'" + f + "' is a directory"); + throw new FileNotFoundException("'" + f + "' is a directory"); } LOG.info("Opening '" + f + "' for reading"); Path absolutePath = makeAbsolute(f); Modified: hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java?rev=1607613&r1=1607612&r2=1607613&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java Thu Jul 3 12:44:24 2014 @@ -226,7 +226,7 @@ public class TestLocalFileSystem { try { fileSys.mkdirs(bad_dir); fail("Failed to detect existing file in path"); - } catch (FileAlreadyExistsException e) { + } catch (ParentNotDirectoryException e) { // Expected }