Repository: apex-malhar Updated Branches: refs/heads/master 1816f78fa -> 0500e0ea4
APEXMALHAR-2303 Added S3 Record Reader module Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0500e0ea Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0500e0ea Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0500e0ea Branch: refs/heads/master Commit: 0500e0ea4ef335fd653c1eb9d9cebdfb8281faea Parents: 1816f78 Author: Ajay <[email protected]> Authored: Thu Oct 27 18:27:28 2016 +0530 Committer: ajaygit158 <[email protected]> Committed: Thu Dec 22 13:01:48 2016 +0530 ---------------------------------------------------------------------- .../datatorrent/lib/io/block/BlockMetadata.java | 35 ++ .../datatorrent/lib/io/block/ReaderContext.java | 116 +++- .../lib/io/fs/AbstractFileSplitter.java | 2 +- .../com/datatorrent/lib/io/fs/FileSplitter.java | 3 +- .../datatorrent/lib/io/fs/S3BlockReader.java | 6 +- .../apex/malhar/lib/fs/FSRecordReader.java | 43 +- .../malhar/lib/fs/FSRecordReaderModule.java | 20 +- .../apex/malhar/lib/fs/s3/S3RecordReader.java | 577 +++++++++++++++++++ .../malhar/lib/fs/s3/S3RecordReaderModule.java | 137 +++++ .../lib/io/block/FSLineReaderTest.java | 61 +- .../apex/malhar/lib/fs/FSRecordReaderTest.java | 9 +- .../lib/fs/s3/S3DelimitedRecordReaderTest.java | 285 +++++++++ .../lib/fs/s3/S3FixedWidthRecordReaderTest.java | 292 ++++++++++ .../lib/fs/s3/S3RecordReaderMockTest.java | 269 +++++++++ .../lib/fs/s3/S3RecordReaderModuleAppTest.java | 221 +++++++ 15 files changed, 2037 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java index 25b3a75..26d19a4 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java @@ -202,6 +202,7 @@ public interface BlockMetadata class FileBlockMetadata extends AbstractBlockMetadata { private final String filePath; + private long fileLength; protected FileBlockMetadata() { @@ -216,16 +217,50 @@ public interface BlockMetadata this.filePath = filePath; } + public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock, + long previousBlockId, long fileLength) + { + super(blockId, offset, length, isLastBlock, previousBlockId); + this.filePath = filePath; + this.fileLength = fileLength; + } + public FileBlockMetadata(String filePath) { this.filePath = filePath; } + public FileBlockMetadata(String filePath, long fileLength) + { + this.filePath = filePath; + this.fileLength = fileLength; + } + public String getFilePath() { return filePath; } + /** + * Returns the length of the file to which this block belongs + * + * @return length of the file to which this block belongs + */ + public long getFileLength() + { + return fileLength; + } + + /** + * Set the length of the file to which this block belongs + * + * @param fileLength + */ + public void setFileLength(long fileLength) + { + this.fileLength = fileLength; + } + public FileBlockMetadata newInstance(@NotNull String filePath) { Preconditions.checkNotNull(filePath); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java index 6fe47a2..d9e8b2e 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java @@ -146,19 +146,26 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> { protected int bufferSize; + /** + * overflowBufferSize is the number of bytes fetched when a record overflows + * to consecutive block + */ + protected int overflowBufferSize; private final transient ByteArrayOutputStream lineBuilder; private final transient ByteArrayOutputStream emptyBuilder; private final transient ByteArrayOutputStream tmpBuilder; - private transient byte[] buffer; + protected transient byte[] buffer; private transient String bufferStr; private transient int posInStr; + private transient boolean overflowBlockRead; public LineReaderContext() { super(); bufferSize = 8192; + overflowBufferSize = 8192; lineBuilder = new ByteArrayOutputStream(); emptyBuilder = new ByteArrayOutputStream(); tmpBuilder = new ByteArrayOutputStream(); @@ -167,10 +174,54 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> @Override public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock) { + overflowBlockRead = false; + posInStr = 0; + offset = blockMetadata.getOffset(); + super.initialize(stream, blockMetadata, consecutiveBlock); + } + + /** + * Reads bytes from the stream starting from the offset into the buffer + * + * @param bytesFromCurrentOffset + * bytes read till now from current block + * @param bytesToFetch + * the number of bytes to be read from stream + * @return the number of bytes actually read, -1 if 0 bytes read + * @throws IOException + */ + protected int readData(final long bytesFromCurrentOffset, final int bytesToFetch) throws IOException + { if (buffer == null) { - buffer = new byte[bufferSize]; + buffer = new byte[bytesToFetch]; } - super.initialize(stream, blockMetadata, consecutiveBlock); + return stream.read(offset + bytesFromCurrentOffset, buffer, 0, bytesToFetch); + } + + /** + * @param usedBytesFromOffset + * number of bytes the pointer is ahead of the offset + * @return true if end of stream reached, false otherwise + */ + protected boolean checkEndOfStream(final long usedBytesFromOffset) + { + if (!overflowBlockRead) { + return (offset - blockMetadata.getOffset() + usedBytesFromOffset < bufferSize); + } else { + return (offset - blockMetadata.getOffset() + usedBytesFromOffset < overflowBufferSize); + } + } + + /** + * Gives the number of bytes to be fetched from the stream + * + * @param overflowBlockRead + * indicates whether we are reading main block or overflow block + * @return bytes to be fetched from stream + */ + protected int calculateBytesToFetch() + { + return (overflowBlockRead ? overflowBufferSize : (bufferSize)); } @Override @@ -186,7 +237,9 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> while (!foundEOL) { tmpBuilder.reset(); if (posInStr == 0) { - bytesRead = stream.read(offset + usedBytes, buffer, 0, bufferSize); + int bytesToFetch = calculateBytesToFetch(); + overflowBlockRead = true; + bytesRead = readData(usedBytes, bytesToFetch); if (bytesRead == -1) { break; } @@ -220,14 +273,13 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> usedBytes += emptyBuilder.toByteArray().length; } else { //end of stream reached - if (bytesRead < bufferSize) { + if (checkEndOfStream(usedBytes)) { break; } //read more bytes from the input stream posInStr = 0; } } - posInStr = 0; //when end of stream is reached then bytesRead is -1 if (bytesRead == -1) { lineBuilder.reset(); @@ -260,6 +312,47 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> { return this.bufferSize; } + + /** + * Sets the overflow buffer size of read. + * + * @param overflowBufferSize + * size of the overflow buffer + */ + public void setOverflowBufferSize(int overflowBufferSize) + { + this.overflowBufferSize = overflowBufferSize; + } + + /** + * @param buffer + * the bytes read from the source + */ + protected void setBuffer(byte[] buffer) + { + this.buffer = buffer; + } + + /** + * Sets whether to read overflow block during next fetch. + * + * @param overflowBlockRead + * boolean indicating whether to read overflow block during next read + */ + public void setOverflowBlockRead(boolean overflowBlockRead) + { + this.overflowBlockRead = overflowBlockRead; + } + + /** + * Returns a boolean indicating whether to read overflow block during next read + * + * @returnoverflowBlockRead + */ + protected boolean isOverflowBlockRead() + { + return overflowBlockRead; + } } /** @@ -280,7 +373,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> super.initialize(stream, blockMetadata, consecutiveBlock); //ignore first entity of all the blocks except the first one because those bytes //were used during the parsing of the previous block. - if (!consecutiveBlock && blockMetadata.getOffset() != 0) { + if (blockMetadata.getPreviousBlockId() != -1 && blockMetadata.getOffset() != 0) { try { Entity entity = readEntity(); offset += entity.usedBytes; @@ -300,6 +393,15 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> } return null; } + + @Override + protected int calculateBytesToFetch() + { + /* + * With readAheadLineReaderContext, we always read at least one overflowBlock. Hence, fetch it in advance + */ + return (this.isOverflowBlockRead() ? overflowBufferSize : (bufferSize + overflowBufferSize)); + } } /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java index c002c18..74addf7 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java @@ -178,7 +178,7 @@ public abstract class AbstractFileSplitter extends BaseOperator */ protected BlockMetadata.FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata) { - return new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath()); + return new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath(), fileMetadata.getFileLength()); } /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java index b9594b3..0949aba 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java @@ -293,7 +293,8 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener, FileMetadata fileMetadata, boolean isLast) { return new FileBlockMetadata(fileMetadata.getFilePath(), fileMetadata.getBlockIds()[blockNumber - 1], pos, - lengthOfFileInBlock, isLast, blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2]); + lengthOfFileInBlock, isLast, blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2], + fileMetadata.getFileLength()); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java index 42231bb..e88191c 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java @@ -69,7 +69,7 @@ public class S3BlockReader extends FSSliceReader * @param s3uri s3 uri * @return name of the bucket */ - protected static String extractBucket(String s3uri) + public static String extractBucket(String s3uri) { return s3uri.substring(s3uri.indexOf('@') + 1, s3uri.indexOf("/", s3uri.indexOf('@'))); } @@ -79,7 +79,7 @@ public class S3BlockReader extends FSSliceReader * @param s3uri given s3 uri * @return the accessKey */ - protected static String extractAccessKey(String s3uri) + public static String extractAccessKey(String s3uri) { return s3uri.substring(s3uri.indexOf("://") + 3, s3uri.indexOf(':', s3uri.indexOf("://") + 3)); } @@ -89,7 +89,7 @@ public class S3BlockReader extends FSSliceReader * @param s3uri given s3uri * @return the secretAccessKey */ - protected static String extractSecretAccessKey(String s3uri) + public static String extractSecretAccessKey(String s3uri) { return s3uri.substring(s3uri.indexOf(':', s3uri.indexOf("://") + 1) + 1, s3uri.indexOf('@')); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java index 268c51b..2d82ac4 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java @@ -20,6 +20,9 @@ package org.apache.apex.malhar.lib.fs; import java.io.IOException; + +import javax.validation.constraints.Pattern; + import org.apache.hadoop.fs.FSDataInputStream; import com.datatorrent.api.Context.OperatorContext; @@ -72,15 +75,36 @@ public class FSRecordReader extends FSSliceReader { super.setup(context); if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) { - ReaderContext.FixedBytesReaderContext<FSDataInputStream> fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext<FSDataInputStream>(); - fixedBytesReaderContext.setLength(recordLength); - readerContext = fixedBytesReaderContext; + readerContext = createFixedWidthReaderContext(); } else { - readerContext = new ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>(); + readerContext = createDelimitedReaderContext(); } } /** + * Creates a recordReaderContext for FixedWidthRecords + * + * @return FixedBytesReaderContext + */ + protected ReaderContext<FSDataInputStream> createFixedWidthReaderContext() + { + ReaderContext.FixedBytesReaderContext<FSDataInputStream> fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext<FSDataInputStream>(); + fixedBytesReaderContext.setLength(recordLength); + return fixedBytesReaderContext; + + } + + /** + * Creates a recordReaderContext for Delimited Records + * + * @return DelimitedRecordReaderContext + */ + protected ReaderContext<FSDataInputStream> createDelimitedReaderContext() + { + return new ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>(); + } + + /** * Read the block data and emit records based on reader context * * @param blockMetadata @@ -105,14 +129,15 @@ public class FSRecordReader extends FSSliceReader } /** - * Criteria for record split + * Criteria for record split : FIXED_WIDTH_RECORD or DELIMITED_RECORD * * @param mode * Mode */ - public void setMode(RECORD_READER_MODE mode) + public void setMode( + @Pattern(regexp = "FIXED_WIDTH_RECORD|DELIMITED_RECORD", flags = Pattern.Flag.CASE_INSENSITIVE) String mode) { - this.mode = mode; + this.mode = RECORD_READER_MODE.valueOf(mode.toUpperCase()); } /** @@ -120,9 +145,9 @@ public class FSRecordReader extends FSSliceReader * * @return mode */ - public RECORD_READER_MODE getMode() + public String getMode() { - return mode; + return mode.toString(); } /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java index b727248..65f7e5f 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java @@ -21,6 +21,7 @@ package org.apache.apex.malhar.lib.fs; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import javax.validation.constraints.Pattern; import javax.validation.constraints.Size; import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; @@ -82,7 +83,7 @@ public class FSRecordReaderModule implements Module /** * Criteria for record split */ - private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + private String mode = RECORD_READER_MODE.DELIMITED_RECORD.toString(); /** * Length for fixed width record @@ -370,24 +371,25 @@ public class FSRecordReaderModule implements Module } /** - * Criteria for record split + * Criteria for record split : FIXED_WIDTH_RECORD or DELIMITED_RECORD * - * @return mode + * @param mode + * Mode */ - public RECORD_READER_MODE getMode() + public void setMode( + @Pattern(regexp = "FIXED_WIDTH_RECORD|DELIMITED_RECORD", flags = Pattern.Flag.CASE_INSENSITIVE) String mode) { - return mode; + this.mode = mode; } /** * Criteria for record split * - * @param mode - * Mode + * @return mode */ - public void setMode(RECORD_READER_MODE mode) + public String getMode() { - this.mode = mode; + return mode; } /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java new file mode 100644 index 0000000..9de2896 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java @@ -0,0 +1,577 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.apex.malhar.lib.fs.FSRecordReader; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.esotericsoftware.kryo.NotNull; +import com.google.common.base.Preconditions; +import com.google.common.io.ByteStreams; + +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from S3 in parallel + * (without ordering guarantees between tuples). Records can be delimited (e.g. + * newline) or fixed width records. Output tuples are byte[]. + * + * Typically, this operator will be connected to output of FileSplitterInput to + * read records in parallel. + */ +@Evolving +public class S3RecordReader extends FSRecordReader +{ + private String endPoint; + @NotNull + private String bucketName; + @NotNull + private String accessKey; + @NotNull + private String secretAccessKey; + private int overflowBufferSize; + + public S3RecordReader() + { + /* + * Set default overflowBufferSize to 1MB + */ + overflowBufferSize = 1024 * 1024; + } + + /** + * S3 reader doesn't make use of any stream, hence returns a null value + * + * @param block + * block metadata + * @return stream (null object) + * @throws IOException + */ + @Override + protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException + { + return null; + } + + /** + * Returns an instance of S3FixedWidthRecordReaderContext after setting + * recordLength and bucketName and initializing s3Client + * + * @return S3DelimitedRecordReaderContext + */ + @Override + protected ReaderContext<FSDataInputStream> createFixedWidthReaderContext() + { + S3FixedWidthRecordReaderContext fixedBytesReaderContext = new S3FixedWidthRecordReaderContext(); + fixedBytesReaderContext.setLength(this.getRecordLength()); + fixedBytesReaderContext.getS3Params().initializeS3Client(accessKey, secretAccessKey, endPoint); + fixedBytesReaderContext.getS3Params().setBucketName(bucketName); + return fixedBytesReaderContext; + } + + /** + * Returns an instance of S3DelimitedRecordReaderContext after setting + * bucketName and overflowBuffersize and initializing the s3Client + * + * @return S3DelimitedRecordReaderContext + */ + @Override + protected ReaderContext<FSDataInputStream> createDelimitedReaderContext() + { + S3DelimitedRecordReaderContext delimitedRecordReaderContext = new S3DelimitedRecordReaderContext(); + delimitedRecordReaderContext.getS3Params().initializeS3Client(accessKey, secretAccessKey, endPoint); + delimitedRecordReaderContext.getS3Params().setBucketName(bucketName); + delimitedRecordReaderContext.setOverflowBufferSize(overflowBufferSize); + return delimitedRecordReaderContext; + } + + /** + * S3RecordReaderParams is used to hold the common parameters used by the + * DelimitedRecordReaderContext and FixedWidthReacordReaderContext for S3 + */ + protected static class S3RecordReaderParams + { + /** + * Amazon client used to read bytes from S3 + */ + private AmazonS3 s3Client; + /** + * S3 bucket name + */ + private String bucketName; + /** + * path of file being processed in bucket + */ + private String filePath; + /** + * length of the file being processed + */ + private long fileLength; + + /** + * Initialize the AmazonS3 client using the accessKey, secretAccessKey, sets + * endpoint for the s3Client if provided + * + * @param accessKey + * @param secretAccessKey + * @param endPoint + */ + public void initializeS3Client(@javax.validation.constraints.NotNull String accessKey, + @javax.validation.constraints.NotNull String secretAccessKey, String endPoint) + { + Preconditions.checkNotNull(accessKey); + Preconditions.checkNotNull(secretAccessKey); + s3Client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey)); + if (endPoint != null) { + s3Client.setEndpoint(endPoint); + } + } + + /** + * Set the AmazonS3 service + * + * @param s3Client + * given s3Client + */ + public void setS3Client(@javax.validation.constraints.NotNull AmazonS3 s3Client) + { + Preconditions.checkNotNull(s3Client); + this.s3Client = s3Client; + } + + /** + * Returns the AmazonS3 service + * + * @return s3Client + */ + public AmazonS3 getS3Client() + { + return s3Client; + } + + /** + * Set the bucket name + * + * @param bucketName + * given bucketName + */ + public void setBucketName(@javax.validation.constraints.NotNull String bucketName) + { + Preconditions.checkNotNull(bucketName); + this.bucketName = bucketName; + } + + /** + * Returns the bucket name + * + * @return bucketName + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Returns the file path + * + * @return filePath + */ + public String getFilePath() + { + return filePath; + } + + /** + * Returns the length of the file to which the block belongs + * + * @return fileLength + */ + public long getFileLength() + { + return fileLength; + } + + /** + * This method reads the blockMetadata input parameter and initializes the + * fileBlock and fileLength + * + * @param blockMetadata + */ + public void initialzeFilepathAndFileLength(BlockMetadata blockMetadata) + { + if (blockMetadata instanceof BlockMetadata.FileBlockMetadata) { + BlockMetadata.FileBlockMetadata fileBlockMetadata = (BlockMetadata.FileBlockMetadata)blockMetadata; + fileLength = fileBlockMetadata.getFileLength(); + filePath = fileBlockMetadata.getFilePath(); + // File path would be the path after bucket name. + // Check if the file path starts with "/" + if (filePath.startsWith("/")) { + filePath = filePath.substring(1); + } + } + } + } + + /** + * RecordReaderContext for reading delimited S3 Records. + */ + protected static class S3DelimitedRecordReaderContext + extends ReaderContext.ReadAheadLineReaderContext<FSDataInputStream> + { + /** + * S3 parameters + */ + private transient S3RecordReaderParams s3Params; + + public S3DelimitedRecordReaderContext() + { + s3Params = new S3RecordReaderParams(); + } + + @Override + public void initialize(FSDataInputStream stream, BlockMetadata blockMetadata, boolean consecutiveBlock) + { + super.initialize(stream, blockMetadata, consecutiveBlock); + s3Params.initialzeFilepathAndFileLength(blockMetadata); + /* + * Initialize the bufferSize and overflowBufferSize + */ + int bufferSize = Long.valueOf(blockMetadata.getLength() - blockMetadata.getOffset()).intValue(); + this.setBufferSize(bufferSize); + if (overflowBufferSize > bufferSize) { + this.setOverflowBufferSize(bufferSize); + } else { + this.setOverflowBufferSize(overflowBufferSize); + } + } + + /** + * S3 block read would be achieved through the AmazonS3 client. Following + * are the steps to achieve: (1) Create the objectRequest from bucketName + * and filePath. (2) Set the range to the above created objectRequest. (3) + * Get the object portion through AmazonS3 client API. (4) Get the object + * content from the above object portion. + * + * @param bytesFromCurrentOffset + * bytes read till now from current offset + * @param bytesToFetch + * the number of bytes to be fetched + * @return the number of bytes read, -1 if 0 bytes read + * @throws IOException + */ + + @Override + protected int readData(final long bytesFromCurrentOffset, final int bytesToFetch) throws IOException + { + GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Params.bucketName, s3Params.filePath); + rangeObjectRequest.setRange(offset + bytesFromCurrentOffset, offset + bytesFromCurrentOffset + bytesToFetch - 1); + S3Object objectPortion = s3Params.s3Client.getObject(rangeObjectRequest); + S3ObjectInputStream wrappedStream = objectPortion.getObjectContent(); + buffer = ByteStreams.toByteArray(wrappedStream); + wrappedStream.close(); + int bufferLength = buffer.length; + if (bufferLength <= 0) { + return -1; + } + return bufferLength; + } + + @Override + protected boolean checkEndOfStream(final long usedBytesFromOffset) + { + if ((offset + usedBytesFromOffset) >= s3Params.fileLength) { + return true; + } + return false; + } + + /** + * Returns the S3RecordReaderParams object + * + * @return s3Params + */ + protected S3RecordReaderParams getS3Params() + { + return s3Params; + } + } + + /** + * RecordReaderContext for reading fixed width S3 Records. + */ + protected static class S3FixedWidthRecordReaderContext + extends ReaderContext.FixedBytesReaderContext<FSDataInputStream> + { + /** + * S3 parameters + */ + private transient S3RecordReaderParams s3Params; + + /** + * used to hold data retrieved from S3 + */ + protected transient byte[] buffer; + + /** + * current offset within the byte[] buffer + */ + private transient int bufferOffset; + + public S3FixedWidthRecordReaderContext() + { + s3Params = new S3RecordReaderParams(); + } + + @Override + public void initialize(FSDataInputStream stream, BlockMetadata blockMetadata, boolean consecutiveBlock) + { + super.initialize(stream, blockMetadata, consecutiveBlock); + s3Params.initialzeFilepathAndFileLength(blockMetadata); + try { + int bytesRead = this.getBlockFromS3(); + if (bytesRead == -1) { + return; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + this.setBufferOffset(0); + } + + /** + * S3 block read would be achieved through the AmazonS3 client. Following + * are the steps to achieve: (1) Create the objectRequest from bucketName + * and filePath. (2) Set the range to the above created objectRequest. Set + * the range so that it gets aligned with the fixed width records. (3) Get + * the object portion through AmazonS3 client API. (4) Get the object + * content from the above object portion. + */ + protected int getBlockFromS3() throws IOException + { + long startOffset = blockMetadata.getOffset() + + (this.length - (blockMetadata.getOffset() % this.length)) % this.length; + long endOffset = blockMetadata.getLength() + + ((this.length - (blockMetadata.getLength() % this.length)) % this.length) - 1; + if (endOffset == (startOffset - 1) || startOffset > s3Params.fileLength) { + /* + * If start and end offset is same, it means no record starts in this block + */ + return -1; + } + if (endOffset >= s3Params.fileLength) { + endOffset = s3Params.fileLength - 1; + } + offset = startOffset; + return readData(startOffset, endOffset); + } + + /** + * Reads data from S3 starting from startOffset till the endOffset and + * returns the number of bytes read + * + * @param startOffset + * offset from where to read + * @param endOffset + * offset till where to read + * @return number of bytes read + * @throws IOException + */ + protected int readData(long startOffset, long endOffset) throws IOException + { + GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Params.bucketName, s3Params.filePath); + rangeObjectRequest.setRange(startOffset, endOffset); + S3Object objectPortion = s3Params.s3Client.getObject(rangeObjectRequest); + S3ObjectInputStream wrappedStream = objectPortion.getObjectContent(); + buffer = ByteStreams.toByteArray(wrappedStream); + wrappedStream.close(); + return buffer.length; + } + + @Override + protected ReaderContext.Entity readEntity() throws IOException + { + entity.clear(); + /* + * In case file length is not a multiple of record length, the last record may not have length = recordLength. + * The data to be read from buffer array should be less in this case. + */ + long bufferLength = length; + if (offset + length > s3Params.fileLength) { + bufferLength = s3Params.fileLength - offset; + } + byte[] record = Arrays.copyOfRange(buffer, Long.valueOf(bufferOffset).intValue(), + Long.valueOf(bufferOffset + bufferLength).intValue()); + bufferOffset += record.length; + entity.setRecord(record); + entity.setUsedBytes(record.length); + return entity; + } + + /** + * Sets the offset within the current buffer + * + * @param bufferOffset + * offset within the current buffer + */ + protected void setBufferOffset(int bufferOffset) + { + this.bufferOffset = bufferOffset; + } + + /** + * Sets the S3RecordReaderParams object + * + * @param s3Params + * S3RecordReaderParams object + */ + protected void setS3Params(S3RecordReaderParams s3Params) + { + this.s3Params = s3Params; + } + + /** + * Returns the S3RecordReaderParams object + * + * @return s3Params + */ + protected S3RecordReaderParams getS3Params() + { + return s3Params; + } + } + + /** + * Size of bytes to be retrieved when a record overflows + * + * return overflowBufferSize + */ + public int getOverflowBufferSize() + { + return overflowBufferSize; + } + + /** + * Size of bytes to be retrieved when a record overflows + * + * @param overflowBufferSize + */ + public void setOverflowBufferSize(int overflowBufferSize) + { + this.overflowBufferSize = overflowBufferSize; + } + + /** + * Get the S3 bucket name + * + * @return bucket + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Set the bucket name where the file resides + * + * @param bucketName + * bucket name + */ + public void setBucketName(@javax.validation.constraints.NotNull String bucketName) + { + Preconditions.checkNotNull(bucketName); + this.bucketName = bucketName; + } + + /** + * Return the access key + * + * @return the accessKey + */ + public String getAccessKey() + { + return accessKey; + } + + /** + * Set the access key + * + * @param accessKey + * given accessKey + */ + public void setAccessKey(@javax.validation.constraints.NotNull String accessKey) + { + Preconditions.checkNotNull(accessKey); + this.accessKey = accessKey; + } + + /** + * Return the secretAccessKey + * + * @return the secretAccessKey + */ + public String getSecretAccessKey() + { + return secretAccessKey; + } + + /** + * Set the secretAccessKey + * + * @param secretAccessKey + * secretAccessKey + */ + public void setSecretAccessKey(@javax.validation.constraints.NotNull String secretAccessKey) + { + Preconditions.checkNotNull(secretAccessKey); + this.secretAccessKey = secretAccessKey; + } + + /** + * S3 endpoint + * + * @param endPoint + * endpoint to be used for S3 + */ + public void setEndPoint(String endPoint) + { + this.endPoint = endPoint; + } + + /** + * S3 endpoint + * + * @return s3 endpoint + */ + public String getEndPoint() + { + return endPoint; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java new file mode 100644 index 0000000..884073c --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs.s3; + +import javax.validation.constraints.Min; + +import org.apache.apex.malhar.lib.fs.FSRecordReaderModule; + +import com.datatorrent.lib.io.fs.S3BlockReader; + +/** + * This module is used for reading records/tuples from S3. Records can be read + * in parallel using multiple partitions of record reader operator. (Ordering is + * not guaranteed when records are read in parallel) + * + * Input S3 directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * <br/> + * 1. files: List of file(s)/directories to read. files would be in the form of + * SCHEME://AccessKey:SecretKey@BucketName/FileOrDirectory , + * SCHEME://AccessKey:SecretKey@BucketName/FileOrDirectory , .... where SCHEME + * is the protocal scheme for the file system. AccessKey is the AWS access key + * and SecretKey is the AWS Secret Key<br/> + * 2. filePatternRegularExp: Files with names matching given regex will be read + * <br/> + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory<br/> + * 4. recursive: if true, scan input directories recursively<br/> + * 5. blockSize: block size used to read input blocks of file, default value + * 64MB<br/> + * 6. overflowBlockSize: For delimited records, this value represents the + * additional data that needs to be read to find the delimiter character for + * last record in a block. This should be set to approximate record size in the + * file, default value is 1MB<br/> + * 7. sequentialFileRead: if true, then each reader partition will read + * different file. <br/> + * instead of reading different offsets of the same file. <br/> + * (File level parallelism instead of block level parallelism)<br/> + * 8. blocksThreshold: number of blocks emitted per window<br/> + * 9. minReaders: Minimum number of block readers for dynamic partitioning<br/> + * 10. maxReaders: Maximum number of block readers for dynamic partitioning<br/> + * 11. repartitionCheckInterval: Interval for re-evaluating dynamic + * partitioning<br/> + * different file. <br/> + * 12. s3EndPoint: Optional parameter used to specify S3 endpoint to use + */ [email protected] +public class S3RecordReaderModule extends FSRecordReaderModule +{ + /** + * Endpoint for S3 + */ + private String s3EndPoint; + @Min(0) + private int overflowBlockSize; + + /** + * Creates an instance of Record Reader + * + * @return S3RecordReader instance + */ + @Override + public S3RecordReader createRecordReader() + { + S3RecordReader s3RecordReader = new S3RecordReader(); + s3RecordReader.setBucketName(S3BlockReader.extractBucket(getFiles())); + s3RecordReader.setAccessKey(S3BlockReader.extractAccessKey(getFiles())); + s3RecordReader.setSecretAccessKey(S3BlockReader.extractSecretAccessKey(getFiles())); + s3RecordReader.setEndPoint(s3EndPoint); + s3RecordReader.setMode(this.getMode()); + s3RecordReader.setRecordLength(this.getRecordLength()); + if (overflowBlockSize != 0) { + s3RecordReader.setOverflowBufferSize(overflowBlockSize); + } + return s3RecordReader; + } + + /** + * Set the S3 endpoint to use + * + * @param s3EndPoint + */ + public void setS3EndPoint(String s3EndPoint) + { + this.s3EndPoint = s3EndPoint; + } + + /** + * Returns the s3 endpoint + * + * @return s3EndPoint + */ + public String getS3EndPoint() + { + return s3EndPoint; + } + + /** + * additional data that needs to be read to find the delimiter character for + * last record in a block. This should be set to approximate record size in + * the file, default value 1MB + * + * @param overflowBlockSize + */ + public void setOverflowBlockSize(int overflowBlockSize) + { + this.overflowBlockSize = overflowBlockSize; + } + + /** + * returns the overflow block size + * + * @return overflowBlockSize + */ + public int getOverflowBlockSize() + { + return overflowBlockSize; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java index 5ddc8a9..2fbb9e7 100644 --- a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java @@ -24,6 +24,8 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.regex.Pattern; @@ -166,18 +168,47 @@ public class FSLineReaderTest testMeta.blockReader.beginWindow(1); - for (int i = 0; i < noOfBlocks; i++) { - BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata( - testMeta.dataFile.getAbsolutePath(), i, - i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, - -1); - testMeta.blockReader.blocksMetadataInput.process(blockMetadata); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < Math.ceil(noOfBlocks / 10.0); j++) { + int blockNo = 10 * j + i; + if (blockNo >= noOfBlocks) { + continue; + } + BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata( + testMeta.dataFile.getAbsolutePath(), blockNo, blockNo * blockSize, + blockNo == noOfBlocks - 1 ? testMeta.dataFile.length() : (blockNo + 1) * blockSize, + blockNo == noOfBlocks - 1, blockNo - 1); + testMeta.blockReader.blocksMetadataInput.process(blockMetadata); + } } testMeta.blockReader.endWindow(); List<Object> messages = testMeta.messageSink.collectedTuples; Assert.assertEquals("No of records", testMeta.messages.size(), messages.size()); + + Collections.sort(testMeta.messages, new Comparator<String[]>() + { + @Override + public int compare(String[] rec1, String[] rec2) + { + return compareStringArrayRecords(rec1, rec2); + } + }); + + Collections.sort(messages, new Comparator<Object>() + { + @Override + public int compare(Object object1, Object object2) + { + @SuppressWarnings("unchecked") + String[] rec1 = ((AbstractBlockReader.ReaderRecord<String>)object1).getRecord().split(","); + @SuppressWarnings("unchecked") + String[] rec2 = ((AbstractBlockReader.ReaderRecord<String>)object2).getRecord().split(","); + return compareStringArrayRecords(rec1, rec2); + } + }); + for (int i = 0; i < messages.size(); i++) { @SuppressWarnings("unchecked") AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i); @@ -198,6 +229,24 @@ public class FSLineReaderTest } } + /** + * Utility function to compare lexicographically 2 records of string arrays + * + * @param rec1 + * @param rec2 + * @return negative if rec1 < rec2, positive if rec1 > rec2, 0 otherwise + */ + private int compareStringArrayRecords(String[] rec1, String[] rec2) + { + for (int i = 0; i < rec1.length && i < rec2.length; i++) { + if (rec1[i].equals(rec2[i])) { + continue; + } + return rec1[i].compareTo(rec2[i]); + } + return 0; + } + @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(FSLineReaderTest.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java index 8560228..642621d 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java @@ -32,7 +32,6 @@ import org.junit.rules.TestWatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -118,6 +117,10 @@ public class FSRecordReaderTest } }; + public static Set<String> getRecords() + { + return records; + } } private static class DelimitedApplication implements StreamingApplication @@ -126,7 +129,7 @@ public class FSRecordReaderTest public void populateDAG(DAG dag, Configuration conf) { FSRecordReaderModule recordReader = dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class); - recordReader.setMode(RECORD_READER_MODE.DELIMITED_RECORD); + recordReader.setMode("delimited_record"); DelimitedValidator validator = dag.addOperator("Validator", new DelimitedValidator()); dag.addStream("records", recordReader.records, validator.data); } @@ -206,7 +209,7 @@ public class FSRecordReaderTest public void populateDAG(DAG dag, Configuration conf) { FSRecordReaderModule recordReader = dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class); - recordReader.setMode(RECORD_READER_MODE.FIXED_WIDTH_RECORD); + recordReader.setMode("FIXED_WIDTH_RECORD"); FixedWidthValidator validator = dag.addOperator("Validator", new FixedWidthValidator()); dag.addStream("records", recordReader.records, validator.data); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java new file mode 100644 index 0000000..a85e2c8 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.io.block.AbstractFSBlockReader; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.netlet.util.Slice; + +@Ignore +public class S3DelimitedRecordReaderTest +{ + private final String accessKey = "*************"; + private final String secretKey = "*********************"; + private static final int overflowBufferSize = 123; + private static final String FILE_1 = "file1.txt"; + private static final String s3Directory = "input/"; + + AbstractFSBlockReader<Slice> getBlockReader(String bucketKey) + { + S3RecordReader blockReader = new S3RecordReader(); + + blockReader.setAccessKey(accessKey); + blockReader.setSecretAccessKey(secretKey); + blockReader.setBucketName(bucketKey); + blockReader.setOverflowBufferSize(overflowBufferSize); + return blockReader; + } + + class TestMeta extends TestWatcher + { + private Context.OperatorContext readerContext; + private AbstractFSBlockReader<Slice> blockReader; + private CollectorTestSink<Object> blockMetadataSink; + private CollectorTestSink<Object> messageSink; + private List<String[]> messages = Lists.newArrayList(); + private String appId; + private String dataFilePath; + private File dataFile; + private String bucketKey; + private AmazonS3 client; + + @Override + protected void starting(org.junit.runner.Description description) + { + dataFilePath = "src/test/resources/reader_test_data.csv"; + dataFile = new File(dataFilePath); + bucketKey = new String("target-" + description.getMethodName()).toLowerCase(); + + client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)); + client.createBucket(bucketKey); + + client.putObject(new PutObjectRequest(bucketKey, s3Directory + FILE_1, dataFile)); + + appId = Long.toHexString(System.currentTimeMillis()); + blockReader = getBlockReader(bucketKey); + + Attribute.AttributeMap.DefaultAttributeMap readerAttr = new Attribute.AttributeMap.DefaultAttributeMap(); + readerAttr.put(DAG.APPLICATION_ID, appId); + readerAttr.put(Context.OperatorContext.SPIN_MILLIS, 10); + readerContext = new OperatorContextTestHelper.TestIdOperatorContext(1, readerAttr); + + blockReader.setup(readerContext); + + messageSink = new CollectorTestSink<>(); + ((S3RecordReader)blockReader).records.setSink(messageSink); + + blockMetadataSink = new CollectorTestSink<>(); + blockReader.blocksMetadataOutput.setSink(blockMetadataSink); + + BufferedReader reader; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(dataFile.getAbsolutePath()))); + String line; + while ((line = reader.readLine()) != null) { + messages.add(line.split(",")); + } + reader.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void finished(Description description) + { + deleteBucketAndContent(); + blockReader.teardown(); + } + + public void deleteBucketAndContent() + { + //Get the list of objects + ObjectListing objectListing = client.listObjects(bucketKey); + for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext();) { + S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next(); + LOG.info("Deleting an object: {}", objectSummary.getKey()); + client.deleteObject(bucketKey, objectSummary.getKey()); + } + client.deleteBucket(bucketKey); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + /** + * The file is processed as a single block + */ + @Test + public void testSingleBlock() + { + BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, 0L, 0L, + testMeta.dataFile.length(), true, -1, testMeta.dataFile.length()); + + testMeta.blockReader.beginWindow(1); + testMeta.blockReader.blocksMetadataInput.process(block); + testMeta.blockReader.endWindow(); + + List<Object> actualMessages = testMeta.messageSink.collectedTuples; + Assert.assertEquals("No of records", testMeta.messages.size(), actualMessages.size()); + + for (int i = 0; i < actualMessages.size(); i++) { + byte[] msg = (byte[])actualMessages.get(i); + Assert.assertTrue("line " + i, Arrays.equals(new String(msg).split(","), testMeta.messages.get(i))); + } + } + + /** + * The file is divided into multiple blocks, blocks are processed + * consecutively + */ + @Test + public void testMultipleBlocks() + { + long blockSize = 1000; + int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1)); + + testMeta.blockReader.beginWindow(1); + + for (int i = 0; i < noOfBlocks; i++) { + BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, i, + i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, + i - 1, testMeta.dataFile.length()); + testMeta.blockReader.blocksMetadataInput.process(blockMetadata); + } + + testMeta.blockReader.endWindow(); + + List<Object> messages = testMeta.messageSink.collectedTuples; + Assert.assertEquals("No of records", testMeta.messages.size(), messages.size()); + for (int i = 0; i < messages.size(); i++) { + + byte[] msg = (byte[])messages.get(i); + Assert.assertTrue("line " + i, Arrays.equals(new String(msg).split(","), testMeta.messages.get(i))); + } + } + + /** + * The file is divided into multiple blocks, blocks are processed + * non-consecutively + */ + @Test + public void testNonConsecutiveBlocks() + { + long blockSize = 1000; + int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1)); + + testMeta.blockReader.beginWindow(1); + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < Math.ceil(noOfBlocks / 10.0); j++) { + int blockNo = 10 * j + i; + if (blockNo >= noOfBlocks) { + continue; + } + BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, + blockNo, blockNo * blockSize, + blockNo == noOfBlocks - 1 ? testMeta.dataFile.length() : (blockNo + 1) * blockSize, + blockNo == noOfBlocks - 1, blockNo - 1, testMeta.dataFile.length()); + testMeta.blockReader.blocksMetadataInput.process(blockMetadata); + } + } + + testMeta.blockReader.endWindow(); + + List<Object> messages = testMeta.messageSink.collectedTuples; + Assert.assertEquals("No of records", testMeta.messages.size(), messages.size()); + + Collections.sort(testMeta.messages, new Comparator<String[]>() + { + @Override + public int compare(String[] rec1, String[] rec2) + { + return compareStringArrayRecords(rec1, rec2); + } + }); + + Collections.sort(messages, new Comparator<Object>() + { + @Override + public int compare(Object object1, Object object2) + { + String[] rec1 = new String((byte[])object1).split(","); + String[] rec2 = new String((byte[])object2).split(","); + return compareStringArrayRecords(rec1, rec2); + } + }); + for (int i = 0; i < messages.size(); i++) { + byte[] msg = (byte[])messages.get(i); + Assert.assertTrue("line " + i, Arrays.equals(new String(msg).split(","), testMeta.messages.get(i))); + } + } + + /** + * Utility function to compare lexicographically 2 records of string arrays + * + * @param rec1 + * @param rec2 + * @return negative if rec1 < rec2, positive if rec1 > rec2, 0 otherwise + */ + private int compareStringArrayRecords(String[] rec1, String[] rec2) + { + for (int i = 0; i < rec1.length && i < rec2.length; i++) { + if (rec1[i].equals(rec2[i])) { + continue; + } + return rec1[i].compareTo(rec2[i]); + } + return 0; + } + + private static final Logger LOG = LoggerFactory.getLogger(S3DelimitedRecordReaderTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java new file mode 100644 index 0000000..0584973 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.io.block.AbstractFSBlockReader; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.netlet.util.Slice; + +@Ignore +public class S3FixedWidthRecordReaderTest +{ + private final String accessKey = "*************"; + private final String secretKey = "*********************"; + private static final int recordLength = 123; + private static final String FILE_1 = "file1.txt"; + private static final String s3Directory = "input/"; + + AbstractFSBlockReader<Slice> getBlockReader(String bucketKey) + { + S3RecordReader blockReader = new S3RecordReader(); + blockReader.setAccessKey(accessKey); + blockReader.setSecretAccessKey(secretKey); + blockReader.setBucketName(bucketKey); + blockReader.setRecordLength(recordLength); + blockReader.setMode("FIXED_WIDTH_RECORD"); + return blockReader; + } + + class TestMeta extends TestWatcher + { + private Context.OperatorContext readerContext; + private AbstractFSBlockReader<Slice> blockReader; + private CollectorTestSink<Object> blockMetadataSink; + private CollectorTestSink<Object> messageSink; + private List<String[]> messages = Lists.newArrayList(); + private String appId; + private String dataFilePath; + private File dataFile; + private String bucketKey; + private AmazonS3 client; + + @Override + protected void starting(org.junit.runner.Description description) + { + dataFilePath = "src/test/resources/reader_test_data.csv"; + dataFile = new File(dataFilePath); + bucketKey = new String("target-" + description.getMethodName()).toLowerCase(); + + client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)); + client.createBucket(bucketKey); + + client.putObject(new PutObjectRequest(bucketKey, s3Directory + FILE_1, dataFile)); + + appId = Long.toHexString(System.currentTimeMillis()); + blockReader = getBlockReader(bucketKey); + + Attribute.AttributeMap.DefaultAttributeMap readerAttr = new Attribute.AttributeMap.DefaultAttributeMap(); + readerAttr.put(DAG.APPLICATION_ID, appId); + readerAttr.put(Context.OperatorContext.SPIN_MILLIS, 10); + readerContext = new OperatorContextTestHelper.TestIdOperatorContext(1, readerAttr); + + blockReader.setup(readerContext); + + messageSink = new CollectorTestSink<>(); + ((S3RecordReader)blockReader).records.setSink(messageSink); + + blockMetadataSink = new CollectorTestSink<>(); + blockReader.blocksMetadataOutput.setSink(blockMetadataSink); + + BufferedReader reader; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(dataFile.getAbsolutePath()))); + String line; + while ((line = reader.readLine()) != null) { + messages.add(line.split(",")); + } + reader.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void finished(Description description) + { + deleteBucketAndContent(); + blockReader.teardown(); + } + + public void deleteBucketAndContent() + { + //Get the list of objects + ObjectListing objectListing = client.listObjects(bucketKey); + for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext();) { + S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next(); + LOG.info("Deleting an object: {}", objectSummary.getKey()); + client.deleteObject(bucketKey, objectSummary.getKey()); + } + client.deleteBucket(bucketKey); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + /** + * The file is processed as a single block + */ + @Test + public void testSingleBlock() + { + BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, 0L, 0L, + testMeta.dataFile.length(), true, -1, testMeta.dataFile.length()); + + testMeta.blockReader.beginWindow(1); + testMeta.blockReader.blocksMetadataInput.process(block); + testMeta.blockReader.endWindow(); + + List<Object> actualMessages = testMeta.messageSink.collectedTuples; + Assert.assertEquals("No of records", testMeta.messages.size(), actualMessages.size()); + + for (int i = 0; i < actualMessages.size(); i++) { + byte[] msg = (byte[])actualMessages.get(i); + /* + * last character is removed below since the testMeta.messages does not contain '\n' + * present in byte[] msg + */ + Assert.assertTrue("line " + i, + Arrays.equals(new String(Arrays.copyOf(msg, msg.length - 1)).split(","), testMeta.messages.get(i))); + } + } + + /** + * The file is divided into multiple blocks, blocks are processed + * consecutively + */ + @Test + public void testMultipleBlocks() + { + long blockSize = 1000; + int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1)); + + testMeta.blockReader.beginWindow(1); + + for (int i = 0; i < noOfBlocks; i++) { + BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, i, + i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, + i - 1, testMeta.dataFile.length()); + testMeta.blockReader.blocksMetadataInput.process(blockMetadata); + } + + testMeta.blockReader.endWindow(); + + List<Object> messages = testMeta.messageSink.collectedTuples; + Assert.assertEquals("No of records", testMeta.messages.size(), messages.size()); + for (int i = 0; i < messages.size(); i++) { + + byte[] msg = (byte[])messages.get(i); + Assert.assertTrue("line " + i, + Arrays.equals(new String(Arrays.copyOf(msg, msg.length - 1)).split(","), testMeta.messages.get(i))); + } + } + + /** + * The file is divided into multiple blocks, blocks are processed + * non-consecutively + */ + @Test + public void testNonConsecutiveBlocks() + { + long blockSize = 1000; + int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1)); + + testMeta.blockReader.beginWindow(1); + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < Math.ceil(noOfBlocks / 10.0); j++) { + int blockNo = 10 * j + i; + if (blockNo >= noOfBlocks) { + continue; + } + BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, + blockNo, blockNo * blockSize, + blockNo == noOfBlocks - 1 ? testMeta.dataFile.length() : (blockNo + 1) * blockSize, + blockNo == noOfBlocks - 1, blockNo - 1, testMeta.dataFile.length()); + testMeta.blockReader.blocksMetadataInput.process(blockMetadata); + } + } + + testMeta.blockReader.endWindow(); + + List<Object> messages = testMeta.messageSink.collectedTuples; + Assert.assertEquals("No of records", testMeta.messages.size(), messages.size()); + + Collections.sort(testMeta.messages, new Comparator<String[]>() + { + @Override + public int compare(String[] rec1, String[] rec2) + { + return compareStringArrayRecords(rec1, rec2); + } + }); + + Collections.sort(messages, new Comparator<Object>() + { + @Override + public int compare(Object object1, Object object2) + { + String[] rec1 = new String((byte[])object1).split(","); + String[] rec2 = new String((byte[])object2).split(","); + return compareStringArrayRecords(rec1, rec2); + } + }); + for (int i = 0; i < messages.size(); i++) { + byte[] msg = (byte[])messages.get(i); + Assert.assertTrue("line " + i, + Arrays.equals(new String(Arrays.copyOf(msg, msg.length - 1)).split(","), testMeta.messages.get(i))); + } + } + + /** + * Utility function to compare lexicographically 2 records of string arrays + * + * @param rec1 + * @param rec2 + * @return negative if rec1 < rec2, positive if rec1 > rec2, 0 otherwise + */ + private int compareStringArrayRecords(String[] rec1, String[] rec2) + { + for (int i = 0; i < rec1.length && i < rec2.length; i++) { + if (rec1[i].equals(rec2[i])) { + continue; + } + return rec1[i].compareTo(rec2[i]); + } + return 0; + } + + private static final Logger LOG = LoggerFactory.getLogger(S3FixedWidthRecordReaderTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java new file mode 100644 index 0000000..bca3f33 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.fs.FSRecordReaderModule; +import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.DelimitedValidator; +import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.FixedWidthValidator; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; + +import com.amazonaws.services.s3.AmazonS3; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata; +import com.datatorrent.lib.io.block.ReaderContext; +import com.datatorrent.lib.io.fs.S3BlockReader; + +import static org.mockito.Mockito.mock; + +public class S3RecordReaderMockTest +{ + private String inputDir; + static String outputDir; + private static final String FILE_1 = "file1.txt"; + private static final String FILE_2 = "file2.txt"; + private static final String FILE_1_DATA = "1234\n567890\nabcde\nfgh\ni\njklmop"; + private static final String FILE_2_DATA = "qr\nstuvw\nxyz\n"; + + public static class TestMeta extends TestWatcher + { + public String baseDirectory; + + @Override + protected void starting(org.junit.runner.Description description) + { + this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName(); + } + + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Before + public void setup() throws Exception + { + inputDir = testMeta.baseDirectory + File.separator + "input"; + + FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), FILE_1_DATA); + FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), FILE_2_DATA); + } + + @Test + public void testDelimitedRecords() throws Exception + { + + DelimitedApplication app = new DelimitedApplication(); + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.files", inputDir); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.blockSize", "3"); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.blocksThreshold", "1"); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.scanIntervalMillis", "10000"); + + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + + Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n"))); + expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n"))); + + while (DelimitedValidator.getRecords().size() != expectedRecords.size()) { + LOG.debug("Waiting for app to finish"); + Thread.sleep(1000); + } + lc.shutdown(); + Assert.assertEquals(expectedRecords, DelimitedValidator.getRecords()); + + } + + private static class S3RecordReaderMock extends S3RecordReader + { + AmazonS3 s3ClientObject; + + @Override + protected FSDataInputStream setupStream(FileBlockMetadata block) throws IOException + { + super.setupStream(block); + return fs.open(new Path(block.getFilePath())); + } + + @Override + public void setup(OperatorContext context) + { + s3ClientObject = mock(AmazonS3.class); + super.setup(context); + } + + @Override + protected ReaderContext<FSDataInputStream> createDelimitedReaderContext() + { + S3DelimitedRecordReaderContextMock s3DelimitedRecordReaderContextMock = new S3DelimitedRecordReaderContextMock(); + s3DelimitedRecordReaderContextMock.getS3Params().setBucketName("S3RecordReaderMock"); + s3DelimitedRecordReaderContextMock.getS3Params().setS3Client(s3ClientObject); + return s3DelimitedRecordReaderContextMock; + } + + @Override + protected ReaderContext<FSDataInputStream> createFixedWidthReaderContext() + { + S3FixedWidthRecordReaderContextMock s3FixedWidthRecordReaderContextMock = new S3FixedWidthRecordReaderContextMock(); + s3FixedWidthRecordReaderContextMock.getS3Params().setBucketName("S3RecordReaderMock"); + s3FixedWidthRecordReaderContextMock.getS3Params().setS3Client(s3ClientObject); + s3FixedWidthRecordReaderContextMock.setLength(this.getRecordLength()); + return s3FixedWidthRecordReaderContextMock; + } + + private class S3DelimitedRecordReaderContextMock extends S3DelimitedRecordReaderContext + { + @Override + protected int readData(long bytesFromCurrentOffset, int bytesToFetch) throws IOException + { + if (buffer == null) { + buffer = new byte[bytesToFetch]; + } + return stream.read(offset + bytesFromCurrentOffset, buffer, 0, bytesToFetch); + } + } + + private static class S3FixedWidthRecordReaderContextMock extends S3FixedWidthRecordReaderContext + { + @Override + protected int readData(long startOffset, long endOffset) throws IOException + { + int bufferSize = Long.valueOf(endOffset - startOffset + 1).intValue(); + if (buffer == null) { + buffer = new byte[bufferSize]; + } + return stream.read(startOffset, buffer, 0, bufferSize); + } + } + } + + private static class S3RecordReaderModuleMock extends S3RecordReaderModule + { + @Override + public S3RecordReader createRecordReader() + { + S3RecordReader s3RecordReader = new S3RecordReaderMock(); + s3RecordReader.setBucketName(S3BlockReader.extractBucket(getFiles())); + s3RecordReader.setAccessKey("****"); + s3RecordReader.setSecretAccessKey("*****"); + s3RecordReader.setMode(this.getMode().toString()); + s3RecordReader.setRecordLength(this.getRecordLength()); + return s3RecordReader; + } + } + + private static class DelimitedApplication implements StreamingApplication + { + + public void populateDAG(DAG dag, Configuration conf) + { + + S3RecordReaderModuleMock recordReader = dag.addModule("S3RecordReaderModuleMock", new S3RecordReaderModuleMock()); + recordReader.setMode("delimited_record"); + DelimitedValidator validator = dag.addOperator("Validator", new DelimitedValidator()); + dag.addStream("records", recordReader.records, validator.data); + } + + } + + @Test + public void testFixedWidthRecords() throws Exception + { + + FixedWidthApplication app = new FixedWidthApplication(); + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.files", inputDir); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.recordLength", "8"); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.blockSize", "3"); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.blocksThreshold", "1"); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.scanIntervalMillis", "10000"); + + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + LOG.debug("Waiting for app to finish"); + Thread.sleep(1000 * 1); + lc.shutdown(); + } + + @Test(expected = IllegalArgumentException.class) + public void testMissingRecordLength() throws Exception + { + FixedWidthApplication app = new FixedWidthApplication(); + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.files", inputDir); + //Should give IllegalArgumentException since recordLength is not set + //conf.set("dt.operator.HDFSRecordReaderModule.prop.recordLength", "8"); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.blocksThreshold", "1"); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.blockSize", "3"); + conf.set("dt.operator.S3RecordReaderModuleMock.prop.scanIntervalMillis", "10000"); + + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + LOG.debug("Waiting for app to finish"); + Thread.sleep(1000 * 1); + lc.shutdown(); + } + + private static class FixedWidthApplication implements StreamingApplication + { + + public void populateDAG(DAG dag, Configuration conf) + { + FSRecordReaderModule recordReader = dag.addModule("S3RecordReaderModuleMock", FSRecordReaderModule.class); + recordReader.setMode("FIXED_WIDTH_RECORD"); + FixedWidthValidator validator = dag.addOperator("Validator", new FixedWidthValidator()); + dag.addStream("records", recordReader.records, validator.data); + } + + } + + private static Logger LOG = LoggerFactory.getLogger(S3RecordReaderMockTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java new file mode 100644 index 0000000..7d69934 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.DelimitedValidator; +import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.FixedWidthValidator; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; + +@Ignore +public class S3RecordReaderModuleAppTest +{ + private String inputDir; + private static final String FILE_1 = "file1.txt"; + private static final String FILE_2 = "file2.txt"; + private static final String FILE_1_DATA = "1234\n567890\nabcde\nfgh\ni\njklmop"; + private static final String FILE_2_DATA = "qr\nstuvw\nxyz\n"; + + private final String accessKey = "*************"; + private final String secretKey = "*********************"; + private AmazonS3 client; + private String files; + private static final String SCHEME = "s3n"; + + public static class TestMeta extends TestWatcher + { + public String baseDirectory; + public String bucketKey; + + @Override + protected void starting(org.junit.runner.Description description) + { + this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName(); + this.bucketKey = new String("target-" + description.getMethodName()).toLowerCase(); + } + } + + @Rule + public S3RecordReaderModuleAppTest.TestMeta testMeta = new S3RecordReaderModuleAppTest.TestMeta(); + + @Before + public void setup() throws Exception + { + client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)); + client.createBucket(testMeta.bucketKey); + inputDir = testMeta.baseDirectory + File.separator + "input"; + + File file1 = new File(inputDir + File.separator + FILE_1); + File file2 = new File(inputDir + File.separator + FILE_2); + + FileUtils.writeStringToFile(file1, FILE_1_DATA); + FileUtils.writeStringToFile(file2, FILE_2_DATA); + + client.putObject(new PutObjectRequest(testMeta.bucketKey, "input/" + FILE_1, file1)); + client.putObject(new PutObjectRequest(testMeta.bucketKey, "input/" + FILE_2, file2)); + files = SCHEME + "://" + accessKey + ":" + secretKey + "@" + testMeta.bucketKey + "/input"; + } + + @Test + public void testS3DelimitedRecords() throws Exception + { + + S3DelimitedApplication app = new S3DelimitedApplication(); + LocalMode lma = LocalMode.newInstance(); + + Configuration conf = new Configuration(false); + conf.set("dt.operator.s3RecordReaderModule.prop.files", files); + conf.set("dt.operator.s3RecordReaderModule.prop.blockSize", "10"); + conf.set("dt.operator.s3RecordReaderModule.prop.overflowBlockSize", "4"); + conf.set("dt.operator.s3RecordReaderModule.prop.scanIntervalMillis", "10000"); + + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + + Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n"))); + expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n"))); + + while (DelimitedValidator.getRecords().size() != expectedRecords.size()) { + LOG.debug("Waiting for app to finish"); + Thread.sleep(1000); + } + lc.shutdown(); + Assert.assertEquals(expectedRecords, DelimitedValidator.getRecords()); + + } + + private static class S3DelimitedApplication implements StreamingApplication + { + + public void populateDAG(DAG dag, Configuration conf) + { + S3RecordReaderModule recordReader = dag.addModule("s3RecordReaderModule", S3RecordReaderModule.class); + DelimitedValidator validator = dag.addOperator("Validator", new DelimitedValidator()); + dag.addStream("records", recordReader.records, validator.data); + } + + } + + @Test + public void testS3FixedWidthRecords() throws Exception + { + + S3FixedWidthApplication app = new S3FixedWidthApplication(); + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.operator.S3RecordReaderModule.prop.files", files); + conf.set("dt.operator.S3RecordReaderModule.prop.recordLength", "8"); + conf.set("dt.operator.S3RecordReaderModule.prop.blocksThreshold", "1"); + conf.set("dt.operator.S3RecordReaderModule.prop.scanIntervalMillis", "10000"); + + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + LOG.debug("Waiting for app to finish"); + Thread.sleep(1000 * 1); + lc.shutdown(); + } + + @Test(expected = IllegalArgumentException.class) + public void testS3MissingRecordLength() throws Exception + { + S3FixedWidthApplication app = new S3FixedWidthApplication(); + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.operator.S3RecordReaderModule.prop.files", files); + //Should give IllegalArgumentException since recordLength is not set + //conf.set("dt.operator.S3RecordReaderModule.prop.recordLength", "8"); + conf.set("dt.operator.S3RecordReaderModule.prop.blocksThreshold", "1"); + conf.set("dt.operator.S3RecordReaderModule.prop.scanIntervalMillis", "10000"); + + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + LOG.debug("Waiting for app to finish"); + Thread.sleep(1000 * 1); + lc.shutdown(); + } + + private static class S3FixedWidthApplication implements StreamingApplication + { + + public void populateDAG(DAG dag, Configuration conf) + { + S3RecordReaderModule recordReader = dag.addModule("S3RecordReaderModule", S3RecordReaderModule.class); + recordReader.setMode("fixed_width_record"); + FixedWidthValidator validator = dag.addOperator("Validator", new FixedWidthValidator()); + dag.addStream("records", recordReader.records, validator.data); + } + + } + + @After + public void tearDown() throws IOException + { + FileUtils.deleteDirectory(new File(inputDir)); + deleteBucketAndContent(); + } + + public void deleteBucketAndContent() + { + //Get the list of objects + ObjectListing objectListing = client.listObjects(testMeta.bucketKey); + for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext();) { + S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next(); + LOG.info("Deleting an object: {}", objectSummary.getKey()); + client.deleteObject(testMeta.bucketKey, objectSummary.getKey()); + } + client.deleteBucket(testMeta.bucketKey); + } + + private static Logger LOG = LoggerFactory.getLogger(S3RecordReaderModuleAppTest.class); +}
