Author: tomwhite Date: Wed May 2 12:16:06 2007 New Revision: 534595 URL: http://svn.apache.org/viewvc?view=rev&rev=534595 Log: HADOOP-1061. Fix bug in listing files in the S3 filesystem.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/MigrationTool.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystemException.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/VersionMismatchException.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534595&r1=534594&r2=534595 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed May 2 12:16:06 2007 @@ -300,6 +300,12 @@ 89. HADOOP-1247. Add support to contrib/streaming for aggregate package, formerly called Abacus. (Runping Qi via cutting) +90. HADOOP-1061. Fix bug in listing files in the S3 filesystem. + NOTE: this change is not backwards compatible! You should use the + MigrationTool supplied to migrate existing S3 filesystem data to + the new format. Please backup your data first before upgrading + (using 'hadoop distcp' for example). (tomwhite) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java?view=diff&rev=534595&r1=534594&r2=534595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java Wed May 2 12:16:06 2007 @@ -14,6 +14,7 @@ public interface FileSystemStore { void initialize(URI uri, Configuration conf) throws IOException; + String getVersion() throws IOException; void storeINode(Path path, INode inode) throws IOException; void storeBlock(Block block, File file) throws IOException; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java?view=diff&rev=534595&r1=534594&r2=534595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java Wed May 2 12:16:06 2007 @@ -9,10 +9,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.UnsupportedEncodingException; import java.net.URI; -import java.net.URLDecoder; -import java.net.URLEncoder; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -27,8 +26,26 @@ import org.jets3t.service.security.AWSCredentials; class Jets3tFileSystemStore implements FileSystemStore { + + private static final String FILE_SYSTEM_NAME = "fs"; + private static final String FILE_SYSTEM_VALUE = "Hadoop"; + + private static final String FILE_SYSTEM_TYPE_NAME = "fs-type"; + private static final String FILE_SYSTEM_TYPE_VALUE = "block"; - private static final String PATH_DELIMITER = urlEncode(Path.SEPARATOR); + private static final String FILE_SYSTEM_VERSION_NAME = "fs-version"; + private static final String FILE_SYSTEM_VERSION_VALUE = "1"; + + private static final Map<String, String> METADATA = + new HashMap<String, String>(); + + static { + METADATA.put(FILE_SYSTEM_NAME, FILE_SYSTEM_VALUE); + METADATA.put(FILE_SYSTEM_TYPE_NAME, FILE_SYSTEM_TYPE_VALUE); + METADATA.put(FILE_SYSTEM_VERSION_NAME, FILE_SYSTEM_VERSION_VALUE); + } + + private static final String PATH_DELIMITER = Path.SEPARATOR; private static final String BLOCK_PREFIX = "block_"; private Configuration conf; @@ -94,7 +111,7 @@ createBucket(bucket.getName()); this.bufferSize = conf.getInt("io.file.buffer.size", 4096); - } + } private void createBucket(String bucketName) throws IOException { try { @@ -106,6 +123,10 @@ throw new S3Exception(e); } } + + public String getVersion() throws IOException { + return FILE_SYSTEM_VERSION_VALUE; + } private void delete(String key) throws IOException { try { @@ -127,7 +148,7 @@ } public boolean inodeExists(Path path) throws IOException { - InputStream in = get(pathToKey(path)); + InputStream in = get(pathToKey(path), true); if (in == null) { return false; } @@ -136,7 +157,7 @@ } public boolean blockExists(long blockId) throws IOException { - InputStream in = get(blockToKey(blockId)); + InputStream in = get(blockToKey(blockId), false); if (in == null) { return false; } @@ -144,9 +165,14 @@ return true; } - private InputStream get(String key) throws IOException { + private InputStream get(String key, boolean checkMetadata) + throws IOException { + try { S3Object object = s3Service.getObject(bucket, key); + if (checkMetadata) { + checkMetadata(object); + } return object.getDataInputStream(); } catch (S3ServiceException e) { if (e.getS3ErrorCode().equals("NoSuchKey")) { @@ -175,8 +201,26 @@ } } + private void checkMetadata(S3Object object) throws S3FileSystemException, + S3ServiceException { + + String name = (String) object.getMetadata(FILE_SYSTEM_NAME); + if (!FILE_SYSTEM_VALUE.equals(name)) { + throw new S3FileSystemException("Not a Hadoop S3 file."); + } + String type = (String) object.getMetadata(FILE_SYSTEM_TYPE_NAME); + if (!FILE_SYSTEM_TYPE_VALUE.equals(type)) { + throw new S3FileSystemException("Not a block file."); + } + String dataVersion = (String) object.getMetadata(FILE_SYSTEM_VERSION_NAME); + if (!FILE_SYSTEM_VERSION_VALUE.equals(dataVersion)) { + throw new VersionMismatchException(FILE_SYSTEM_VERSION_VALUE, + dataVersion); + } + } + public INode retrieveINode(Path path) throws IOException { - return INode.deserialize(get(pathToKey(path))); + return INode.deserialize(get(pathToKey(path), true)); } public File retrieveBlock(Block block, long byteRangeStart) @@ -224,7 +268,7 @@ if (!prefix.endsWith(PATH_DELIMITER)) { prefix += PATH_DELIMITER; } - S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER, 0); + S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER); Set<Path> prefixes = new TreeSet<Path>(); for (int i = 0; i < objects.length; i++) { prefixes.add(keyToPath(objects[i].getKey())); @@ -260,12 +304,17 @@ } } - private void put(String key, InputStream in, long length) throws IOException { + private void put(String key, InputStream in, long length, boolean storeMetadata) + throws IOException { + try { S3Object object = new S3Object(key); object.setDataInputStream(in); object.setContentType("binary/octet-stream"); object.setContentLength(length); + if (storeMetadata) { + object.addAllMetadata(METADATA); + } s3Service.putObject(bucket, object); } catch (S3ServiceException e) { if (e.getCause() instanceof IOException) { @@ -276,14 +325,14 @@ } public void storeINode(Path path, INode inode) throws IOException { - put(pathToKey(path), inode.serialize(), inode.getSerializedLength()); + put(pathToKey(path), inode.serialize(), inode.getSerializedLength(), true); } public void storeBlock(Block block, File file) throws IOException { BufferedInputStream in = null; try { in = new BufferedInputStream(new FileInputStream(file)); - put(blockToKey(block), in, block.getLength()); + put(blockToKey(block), in, block.getLength(), false); } finally { closeQuietly(in); } @@ -303,35 +352,13 @@ if (!path.isAbsolute()) { throw new IllegalArgumentException("Path must be absolute: " + path); } - return urlEncode(path.toUri().getPath()); + return path.toUri().getPath(); } private Path keyToPath(String key) { - return new Path(urlDecode(key)); - } - - private static String urlEncode(String s) { - try { - return URLEncoder.encode(s, "UTF-8"); - } catch (UnsupportedEncodingException e) { - // Should never happen since every implementation of the Java Platform - // is required to support UTF-8. - // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html - throw new IllegalStateException(e); - } + return new Path(key); } - private static String urlDecode(String s) { - try { - return URLDecoder.decode(s, "UTF-8"); - } catch (UnsupportedEncodingException e) { - // Should never happen since every implementation of the Java Platform - // is required to support UTF-8. - // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html - throw new IllegalStateException(e); - } - } - private String blockToKey(long blockId) { return BLOCK_PREFIX + blockId; } Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/MigrationTool.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/MigrationTool.java?view=auto&rev=534595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/MigrationTool.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/MigrationTool.java Wed May 2 12:16:06 2007 @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolBase; +import org.jets3t.service.S3Service; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Bucket; +import org.jets3t.service.model.S3Object; +import org.jets3t.service.security.AWSCredentials; + +/** + * <p> + * This class is a tool for migrating data from an older to a newer version + * of an S3 filesystem. + * </p> + * <p> + * All files in the filesystem are migrated by re-writing the block metadata + * - no datafiles are touched. + * </p> + */ +public class MigrationTool extends ToolBase { + + private S3Service s3Service; + private S3Bucket bucket; + + public static void main(String[] args) throws Exception { + int res = new MigrationTool().doMain(new Configuration(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + + if (args.length == 0) { + System.err.println("Usage: MigrationTool <S3 file system URI>"); + System.err.println("\t<S3 file system URI>\tfilesystem to migrate"); + return -1; + } + + URI uri = URI.create(args[0]); + + initialize(uri, conf); + + FileSystemStore newStore = new Jets3tFileSystemStore(); + newStore.initialize(uri, conf); + + if (get("%2F") != null) { + System.err.println("Current version number is [unversioned]."); + System.err.println("Target version number is " + + newStore.getVersion() + "."); + Store oldStore = new UnversionedStore(); + migrate(oldStore, newStore); + return 0; + } else { + S3Object root = get("/"); + if (root != null) { + String version = (String) root.getMetadata("fs-version"); + if (version == null) { + System.err.println("Can't detect version - exiting."); + } else { + String newVersion = newStore.getVersion(); + System.err.println("Current version number is " + version + "."); + System.err.println("Target version number is " + newVersion + "."); + if (version.equals(newStore.getVersion())) { + System.err.println("No migration required."); + return 0; + } + // use version number to create Store + //Store oldStore = ... + //migrate(oldStore, newStore); + System.err.println("Not currently implemented."); + return 0; + } + } + System.err.println("Can't detect version - exiting."); + return 0; + } + + } + + public void initialize(URI uri, Configuration conf) throws IOException { + + this.conf = conf; + + try { + String accessKey = null; + String secretAccessKey = null; + String userInfo = uri.getUserInfo(); + if (userInfo != null) { + int index = userInfo.indexOf(':'); + if (index != -1) { + accessKey = userInfo.substring(0, index); + secretAccessKey = userInfo.substring(index + 1); + } else { + accessKey = userInfo; + } + } + if (accessKey == null) { + accessKey = conf.get("fs.s3.awsAccessKeyId"); + } + if (secretAccessKey == null) { + secretAccessKey = conf.get("fs.s3.awsSecretAccessKey"); + } + if (accessKey == null && secretAccessKey == null) { + throw new IllegalArgumentException("AWS " + + "Access Key ID and Secret Access Key " + + "must be specified as the username " + + "or password (respectively) of a s3 URL, " + + "or by setting the " + + "fs.s3.awsAccessKeyId or " + + "fs.s3.awsSecretAccessKey properties (respectively)."); + } else if (accessKey == null) { + throw new IllegalArgumentException("AWS " + + "Access Key ID must be specified " + + "as the username of a s3 URL, or by setting the " + + "fs.s3.awsAccessKeyId property."); + } else if (secretAccessKey == null) { + throw new IllegalArgumentException("AWS " + + "Secret Access Key must be specified " + + "as the password of a s3 URL, or by setting the " + + "fs.s3.awsSecretAccessKey property."); + } + AWSCredentials awsCredentials = + new AWSCredentials(accessKey, secretAccessKey); + this.s3Service = new RestS3Service(awsCredentials); + } catch (S3ServiceException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + bucket = new S3Bucket(uri.getHost()); + } + + private void migrate(Store oldStore, FileSystemStore newStore) + throws IOException { + for (Path path : oldStore.listAllPaths()) { + INode inode = oldStore.retrieveINode(path); + oldStore.deleteINode(path); + newStore.storeINode(path, inode); + } + } + + private S3Object get(String key) { + try { + return s3Service.getObject(bucket, key); + } catch (S3ServiceException e) { + if (e.getS3ErrorCode().equals("NoSuchKey")) { + return null; + } + } + return null; + } + + interface Store { + + Set<Path> listAllPaths() throws IOException; + INode retrieveINode(Path path) throws IOException; + void deleteINode(Path path) throws IOException; + + } + + class UnversionedStore implements Store { + + public Set<Path> listAllPaths() throws IOException { + try { + String prefix = urlEncode(Path.SEPARATOR); + S3Object[] objects = s3Service.listObjects(bucket, prefix, null); + Set<Path> prefixes = new TreeSet<Path>(); + for (int i = 0; i < objects.length; i++) { + prefixes.add(keyToPath(objects[i].getKey())); + } + return prefixes; + } catch (S3ServiceException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + } + + public void deleteINode(Path path) throws IOException { + delete(pathToKey(path)); + } + + private void delete(String key) throws IOException { + try { + s3Service.deleteObject(bucket, key); + } catch (S3ServiceException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + } + + public INode retrieveINode(Path path) throws IOException { + return INode.deserialize(get(pathToKey(path))); + } + + private InputStream get(String key) throws IOException { + try { + S3Object object = s3Service.getObject(bucket, key); + return object.getDataInputStream(); + } catch (S3ServiceException e) { + if (e.getS3ErrorCode().equals("NoSuchKey")) { + return null; + } + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new S3Exception(e); + } + } + + private String pathToKey(Path path) { + if (!path.isAbsolute()) { + throw new IllegalArgumentException("Path must be absolute: " + path); + } + return urlEncode(path.toUri().getPath()); + } + + private Path keyToPath(String key) { + return new Path(urlDecode(key)); + } + + private String urlEncode(String s) { + try { + return URLEncoder.encode(s, "UTF-8"); + } catch (UnsupportedEncodingException e) { + // Should never happen since every implementation of the Java Platform + // is required to support UTF-8. + // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html + throw new IllegalStateException(e); + } + } + + private String urlDecode(String s) { + try { + return URLDecoder.decode(s, "UTF-8"); + } catch (UnsupportedEncodingException e) { + // Should never happen since every implementation of the Java Platform + // is required to support UTF-8. + // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html + throw new IllegalStateException(e); + } + } + + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystemException.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystemException.java?view=auto&rev=534595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystemException.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystemException.java Wed May 2 12:16:06 2007 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3; + +import java.io.IOException; + +/** + * Thrown when there is a fatal exception while using [EMAIL PROTECTED] S3FileSystem}. + */ +public class S3FileSystemException extends IOException { + public S3FileSystemException(String message) { + super(message); + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/VersionMismatchException.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/VersionMismatchException.java?view=auto&rev=534595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/VersionMismatchException.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/VersionMismatchException.java Wed May 2 12:16:06 2007 @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3; + +/** + * Thrown when Hadoop cannot read the version of the data stored + * in [EMAIL PROTECTED] S3FileSystem}. + */ +public class VersionMismatchException extends S3FileSystemException { + public VersionMismatchException(String clientVersion, String dataVersion) { + super("Version mismatch: client expects version " + clientVersion + + ", but data has version " + + (dataVersion == null ? "[unversioned]" : dataVersion)); + } +} Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java?view=diff&rev=534595&r1=534594&r2=534595 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java Wed May 2 12:16:06 2007 @@ -33,6 +33,10 @@ public void initialize(URI uri, Configuration conf) { this.conf = conf; } + + public String getVersion() throws IOException { + return "0"; + } public void deleteINode(Path path) throws IOException { inodes.remove(path);