Repository: apex-malhar Updated Branches: refs/heads/master 7dea3d0a0 -> 0c70e92e6
APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0c70e92e Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0c70e92e Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0c70e92e Branch: refs/heads/master Commit: 0c70e92e6f2a1a631569d6b9608ac79de0a50b96 Parents: 7dea3d0 Author: Chaitanya <[email protected]> Authored: Mon Aug 8 11:51:11 2016 +0530 Committer: Chaitanya <[email protected]> Committed: Mon Aug 8 11:51:11 2016 +0530 ---------------------------------------------------------------------- library/pom.xml | 1 - .../datatorrent/lib/io/fs/S3BlockReader.java | 177 ++++++++++++++++--- .../datatorrent/lib/io/fs/S3InputModule.java | 14 +- 3 files changed, 156 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0c70e92e/library/pom.xml ---------------------------------------------------------------------- diff --git a/library/pom.xml b/library/pom.xml index 8d264a4..026bbae 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -343,7 +343,6 @@ <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-s3</artifactId> <version>1.10.73</version> - <scope>test</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0c70e92e/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java index 34f64ed..8ce1304 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java @@ -19,10 +19,17 @@ package com.datatorrent.lib.io.fs; import java.io.IOException; + import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; -import com.google.common.annotations.VisibleForTesting; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.google.common.io.ByteStreams; + import com.datatorrent.api.Context; import com.datatorrent.lib.io.block.BlockMetadata; import com.datatorrent.lib.io.block.FSSliceReader; @@ -35,8 +42,11 @@ import com.datatorrent.lib.io.block.ReaderContext; @org.apache.hadoop.classification.InterfaceStability.Evolving public class S3BlockReader extends FSSliceReader { - protected transient String s3bucketUri; + private transient AmazonS3 s3Client; private String bucketName; + private String accessKey; + private String secretAccessKey; + public S3BlockReader() { @@ -47,7 +57,9 @@ public class S3BlockReader extends FSSliceReader public void setup(Context.OperatorContext context) { super.setup(context); - s3bucketUri = fs.getScheme() + "://" + bucketName; + s3Client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey)); + ((S3BlockReaderContext)readerContext).setBucketName(bucketName); + ((S3BlockReaderContext)readerContext).setS3Client(s3Client); } /** @@ -55,14 +67,33 @@ public class S3BlockReader extends FSSliceReader * @param s3uri s3 uri * @return name of the bucket */ - @VisibleForTesting protected static String extractBucket(String s3uri) { return s3uri.substring(s3uri.indexOf('@') + 1, s3uri.indexOf("/", s3uri.indexOf('@'))); } /** - * Create the stream from the bucket uri and block path. + * Extracts the accessKey from the given uri + * @param s3uri given s3 uri + * @return the accessKey + */ + protected static String extractAccessKey(String s3uri) + { + return s3uri.substring(s3uri.indexOf("://") + 3, s3uri.indexOf(':', s3uri.indexOf("://") + 3)); + } + + /** + * Extracts the secretAccessKey from the given uri + * @param s3uri given s3uri + * @return the secretAccessKey + */ + protected static String extractSecretAccessKey(String s3uri) + { + return s3uri.substring(s3uri.indexOf(':', s3uri.indexOf("://") + 1) + 1, s3uri.indexOf('@')); + } + + /** + * Extract the file path from given block and set it to the readerContext * @param block block metadata * @return stream * @throws IOException @@ -70,20 +101,30 @@ public class S3BlockReader extends FSSliceReader @Override protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException { - FSDataInputStream ins = fs.open(new Path(s3bucketUri + block.getFilePath())); - ins.seek(block.getOffset()); - return ins; + String filePath = block.getFilePath(); + // File path would be the path after bucket name. + // Check if the file path starts with "/" + if (filePath.startsWith("/")) { + filePath = filePath.substring(1); + } + ((S3BlockReaderContext)readerContext).setFilePath(filePath); + return null; } /** - * BlockReadeContext for reading S3 Blocks. Stream could't able be read the complete block. - * This will wait till the block reads completely. + * BlockReadeContext for reading S3 Blocks. */ private static class S3BlockReaderContext extends ReaderContext.FixedBytesReaderContext<FSDataInputStream> { + private transient AmazonS3 s3Client; + private transient String bucketName; + private transient String filePath; /** - * S3 File systems doesn't read the specified block completely while using readFully API. - * This will read small chunks continuously until will reach the specified block size. + * S3 block read would be achieved through the AmazonS3 client. Following are the steps to achieve: + * (1) Create the objectRequest from bucketName and filePath. + * (2) Set the range to the above created objectRequest. + * (3) Get the object portion through AmazonS3 client API. + * (4) Get the object content from the above object portion. * @return the block entity * @throws IOException */ @@ -91,19 +132,71 @@ public class S3BlockReader extends FSSliceReader protected Entity readEntity() throws IOException { entity.clear(); - int bytesToRead = length; - if (offset + length >= blockMetadata.getLength()) { - bytesToRead = (int)(blockMetadata.getLength() - offset); - } - byte[] record = new byte[bytesToRead]; - int bytesRead = 0; - while (bytesRead < bytesToRead) { - bytesRead += stream.read(record, bytesRead, bytesToRead - bytesRead); - } - entity.setUsedBytes(bytesRead); + GetObjectRequest rangeObjectRequest = new GetObjectRequest( + bucketName, filePath); + rangeObjectRequest.setRange(offset, blockMetadata.getLength() - 1); + S3Object objectPortion = s3Client.getObject(rangeObjectRequest); + S3ObjectInputStream wrappedStream = objectPortion.getObjectContent(); + byte[] record = ByteStreams.toByteArray(wrappedStream); + entity.setUsedBytes(record.length); entity.setRecord(record); + wrappedStream.close(); return entity; } + + /** + * Return the AmazonS3 service + * @return the s3Client + */ + public AmazonS3 getS3Client() + { + return s3Client; + } + + /** + * Set the AmazonS3 service + * @param s3Client given s3Client + */ + public void setS3Client(AmazonS3 s3Client) + { + this.s3Client = s3Client; + } + + /** + * Get the bucket name + * @return the bucketName + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Set the bucket name + * @param bucketName given bucketName + */ + public void setBucketName(String bucketName) + { + this.bucketName = bucketName; + } + + /** + * Get the file path + * @return the file path + */ + public String getFilePath() + { + return filePath; + } + + /** + * Sets the file path + * @param filePath given filePath + */ + public void setFilePath(String filePath) + { + this.filePath = filePath; + } } /** @@ -116,11 +209,47 @@ public class S3BlockReader extends FSSliceReader } /** - * Set the bucket name + * Set the bucket name where the file resides * @param bucketName bucket name */ public void setBucketName(String bucketName) { this.bucketName = bucketName; } + + /** + * Return the access key + * @return the accessKey + */ + public String getAccessKey() + { + return accessKey; + } + + /** + * Set the access key + * @param accessKey given accessKey + */ + public void setAccessKey(String accessKey) + { + this.accessKey = accessKey; + } + + /** + * Return the secretAccessKey + * @return the secretAccessKey + */ + public String getSecretAccessKey() + { + return secretAccessKey; + } + + /** + * Set the secretAccessKey + * @param secretAccessKey secretAccessKey + */ + public void setSecretAccessKey(String secretAccessKey) + { + this.secretAccessKey = secretAccessKey; + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0c70e92e/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java index 50c40ec..52e7ff0 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java @@ -24,9 +24,6 @@ import com.datatorrent.lib.io.block.FSSliceReader; * S3InputModule is used to read files/list of files (or directory) from S3 bucket. <br/> * Module emits, <br/> * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/> - * Parallel read will work only if the scheme is "s3a" and the Hadoop version is 2.7+. - * Parallel read doesn't work in the case of the scheme is "s3n/s3". In this case, this operator explicitly - * disables the parallel read functionality. * For more info about S3 scheme protocals, please have a look at * <a href="https://wiki.apache.org/hadoop/AmazonS3">https://wiki.apache.org/hadoop/AmazonS3.</a> * @@ -53,16 +50,11 @@ public class S3InputModule extends FSInputModule @Override public FSSliceReader createBlockReader() { - //Extract the scheme from the files - String s3input = getFiles(); - String scheme = s3input.substring(0, s3input.indexOf("://")); - // Parallel read doesn't support, if the scheme is s3 (or) s3n. - if (scheme.equals("s3") || scheme.equals("s3n")) { - setSequencialFileRead(true); - } - // Set the s3 bucket name to the block reader + // Set the s3 bucket name, accessKey, SecretAccessKey to the block reader S3BlockReader reader = new S3BlockReader(); reader.setBucketName(S3BlockReader.extractBucket(getFiles())); + reader.setAccessKey(S3BlockReader.extractAccessKey(getFiles())); + reader.setSecretAccessKey(S3BlockReader.extractSecretAccessKey(getFiles())); return reader; } }
