Repository: apex-malhar
Updated Branches:
  refs/heads/master 7dea3d0a0 -> 0c70e92e6


APEXMALHAR-2174-S3-ReaderIssue Fixed the S3 reader issue


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0c70e92e
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0c70e92e
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0c70e92e

Branch: refs/heads/master
Commit: 0c70e92e6f2a1a631569d6b9608ac79de0a50b96
Parents: 7dea3d0
Author: Chaitanya <[email protected]>
Authored: Mon Aug 8 11:51:11 2016 +0530
Committer: Chaitanya <[email protected]>
Committed: Mon Aug 8 11:51:11 2016 +0530

----------------------------------------------------------------------
 library/pom.xml                                 |   1 -
 .../datatorrent/lib/io/fs/S3BlockReader.java    | 177 ++++++++++++++++---
 .../datatorrent/lib/io/fs/S3InputModule.java    |  14 +-
 3 files changed, 156 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0c70e92e/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index 8d264a4..026bbae 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -343,7 +343,6 @@
       <groupId>com.amazonaws</groupId>
       <artifactId>aws-java-sdk-s3</artifactId>
       <version>1.10.73</version>
-      <scope>test</scope>
     </dependency>
     <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0c70e92e/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
index 34f64ed..8ce1304 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
@@ -19,10 +19,17 @@
 package com.datatorrent.lib.io.fs;
 
 import java.io.IOException;
+
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.google.common.io.ByteStreams;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.lib.io.block.BlockMetadata;
 import com.datatorrent.lib.io.block.FSSliceReader;
@@ -35,8 +42,11 @@ import com.datatorrent.lib.io.block.ReaderContext;
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public class S3BlockReader extends FSSliceReader
 {
-  protected transient String s3bucketUri;
+  private transient AmazonS3 s3Client;
   private String bucketName;
+  private String accessKey;
+  private String secretAccessKey;
+
 
   public S3BlockReader()
   {
@@ -47,7 +57,9 @@ public class S3BlockReader extends FSSliceReader
   public void setup(Context.OperatorContext context)
   {
     super.setup(context);
-    s3bucketUri = fs.getScheme() + "://" + bucketName;
+    s3Client = new AmazonS3Client(new BasicAWSCredentials(accessKey, 
secretAccessKey));
+    ((S3BlockReaderContext)readerContext).setBucketName(bucketName);
+    ((S3BlockReaderContext)readerContext).setS3Client(s3Client);
   }
 
   /**
@@ -55,14 +67,33 @@ public class S3BlockReader extends FSSliceReader
    * @param s3uri s3 uri
    * @return name of the bucket
    */
-  @VisibleForTesting
   protected static String extractBucket(String s3uri)
   {
     return s3uri.substring(s3uri.indexOf('@') + 1, s3uri.indexOf("/", 
s3uri.indexOf('@')));
   }
 
   /**
-   * Create the stream from the bucket uri and block path.
+   * Extracts the accessKey from the given uri
+   * @param s3uri given s3 uri
+   * @return the accessKey
+   */
+  protected static String extractAccessKey(String s3uri)
+  {
+    return s3uri.substring(s3uri.indexOf("://") + 3, s3uri.indexOf(':', 
s3uri.indexOf("://") + 3));
+  }
+
+  /**
+   * Extracts the secretAccessKey from the given uri
+   * @param s3uri given s3uri
+   * @return the secretAccessKey
+   */
+  protected static String extractSecretAccessKey(String s3uri)
+  {
+    return s3uri.substring(s3uri.indexOf(':', s3uri.indexOf("://") + 1) + 1, 
s3uri.indexOf('@'));
+  }
+
+  /**
+   * Extract the file path from given block and set it to the readerContext
    * @param block block metadata
    * @return stream
    * @throws IOException
@@ -70,20 +101,30 @@ public class S3BlockReader extends FSSliceReader
   @Override
   protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata 
block) throws IOException
   {
-    FSDataInputStream ins = fs.open(new Path(s3bucketUri + 
block.getFilePath()));
-    ins.seek(block.getOffset());
-    return ins;
+    String filePath = block.getFilePath();
+    // File path would be the path after bucket name.
+    // Check if the file path starts with "/"
+    if (filePath.startsWith("/")) {
+      filePath = filePath.substring(1);
+    }
+    ((S3BlockReaderContext)readerContext).setFilePath(filePath);
+    return null;
   }
 
   /**
-   * BlockReadeContext for reading S3 Blocks. Stream could't able be read the 
complete block.
-   * This will wait till the block reads completely.
+   * BlockReadeContext for reading S3 Blocks.
    */
   private static class S3BlockReaderContext extends 
ReaderContext.FixedBytesReaderContext<FSDataInputStream>
   {
+    private transient AmazonS3 s3Client;
+    private transient String bucketName;
+    private transient String filePath;
     /**
-     * S3 File systems doesn't read the specified block completely while using 
readFully API.
-     * This will read small chunks continuously until will reach the specified 
block size.
+     * S3 block read would be achieved through the AmazonS3 client. Following 
are the steps to achieve:
+     * (1) Create the objectRequest from bucketName and filePath.
+     * (2) Set the range to the above created objectRequest.
+     * (3) Get the object portion through AmazonS3 client API.
+     * (4) Get the object content from the above object portion.
      * @return the block entity
      * @throws IOException
      */
@@ -91,19 +132,71 @@ public class S3BlockReader extends FSSliceReader
     protected Entity readEntity() throws IOException
     {
       entity.clear();
-      int bytesToRead = length;
-      if (offset + length >= blockMetadata.getLength()) {
-        bytesToRead = (int)(blockMetadata.getLength() - offset);
-      }
-      byte[] record = new byte[bytesToRead];
-      int bytesRead = 0;
-      while (bytesRead < bytesToRead) {
-        bytesRead += stream.read(record, bytesRead, bytesToRead - bytesRead);
-      }
-      entity.setUsedBytes(bytesRead);
+      GetObjectRequest rangeObjectRequest = new GetObjectRequest(
+          bucketName, filePath);
+      rangeObjectRequest.setRange(offset, blockMetadata.getLength() - 1);
+      S3Object objectPortion = s3Client.getObject(rangeObjectRequest);
+      S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
+      byte[] record = ByteStreams.toByteArray(wrappedStream);
+      entity.setUsedBytes(record.length);
       entity.setRecord(record);
+      wrappedStream.close();
       return entity;
     }
+
+    /**
+     * Return the AmazonS3 service
+     * @return the s3Client
+     */
+    public AmazonS3 getS3Client()
+    {
+      return s3Client;
+    }
+
+    /**
+     * Set the AmazonS3 service
+     * @param s3Client given s3Client
+     */
+    public void setS3Client(AmazonS3 s3Client)
+    {
+      this.s3Client = s3Client;
+    }
+
+    /**
+     * Get the bucket name
+     * @return the bucketName
+     */
+    public String getBucketName()
+    {
+      return bucketName;
+    }
+
+    /**
+     * Set the bucket name
+     * @param bucketName given bucketName
+     */
+    public void setBucketName(String bucketName)
+    {
+      this.bucketName = bucketName;
+    }
+
+    /**
+     * Get the file path
+     * @return the file path
+     */
+    public String getFilePath()
+    {
+      return filePath;
+    }
+
+    /**
+     * Sets the file path
+     * @param filePath given filePath
+     */
+    public void setFilePath(String filePath)
+    {
+      this.filePath = filePath;
+    }
   }
 
   /**
@@ -116,11 +209,47 @@ public class S3BlockReader extends FSSliceReader
   }
 
   /**
-   * Set the bucket name
+   * Set the bucket name where the file resides
    * @param bucketName bucket name
    */
   public void setBucketName(String bucketName)
   {
     this.bucketName = bucketName;
   }
+
+  /**
+   * Return the access key
+   * @return the accessKey
+   */
+  public String getAccessKey()
+  {
+    return accessKey;
+  }
+
+  /**
+   * Set the access key
+   * @param accessKey given accessKey
+   */
+  public void setAccessKey(String accessKey)
+  {
+    this.accessKey = accessKey;
+  }
+
+  /**
+   * Return the secretAccessKey
+   * @return the secretAccessKey
+   */
+  public String getSecretAccessKey()
+  {
+    return secretAccessKey;
+  }
+
+  /**
+   * Set the secretAccessKey
+   * @param secretAccessKey secretAccessKey
+   */
+  public void setSecretAccessKey(String secretAccessKey)
+  {
+    this.secretAccessKey = secretAccessKey;
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0c70e92e/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
index 50c40ec..52e7ff0 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java
@@ -24,9 +24,6 @@ import com.datatorrent.lib.io.block.FSSliceReader;
  * S3InputModule is used to read files/list of files (or directory) from S3 
bucket. <br/>
  * Module emits, <br/>
  * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
- * Parallel read will work only if the scheme is "s3a" and the Hadoop version 
is 2.7+.
- * Parallel read doesn't work in the case of the scheme is "s3n/s3". In this 
case, this operator explicitly
- * disables the parallel read functionality.
  * For more info about S3 scheme protocals, please have a look at
  * <a 
href="https://wiki.apache.org/hadoop/AmazonS3";>https://wiki.apache.org/hadoop/AmazonS3.</a>
  *
@@ -53,16 +50,11 @@ public class S3InputModule extends FSInputModule
   @Override
   public FSSliceReader createBlockReader()
   {
-    //Extract the scheme from the files
-    String s3input = getFiles();
-    String scheme =  s3input.substring(0, s3input.indexOf("://"));
-    // Parallel read doesn't support, if the scheme is s3 (or) s3n.
-    if (scheme.equals("s3") || scheme.equals("s3n")) {
-      setSequencialFileRead(true);
-    }
-    // Set the s3 bucket name to the block reader
+    // Set the s3 bucket name, accessKey, SecretAccessKey to the block reader
     S3BlockReader reader = new S3BlockReader();
     reader.setBucketName(S3BlockReader.extractBucket(getFiles()));
+    reader.setAccessKey(S3BlockReader.extractAccessKey(getFiles()));
+    
reader.setSecretAccessKey(S3BlockReader.extractSecretAccessKey(getFiles()));
     return reader;
   }
 }

Reply via email to