Repository: apex-malhar Updated Branches: refs/heads/master 0b66f19d1 -> 23970382c
APEXMALHAR-2019 Implemented S3 Input Module Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5de26e4a Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5de26e4a Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5de26e4a Branch: refs/heads/master Commit: 5de26e4a0d65652e81007445991f88075089fb0c Parents: 67b84dd Author: Chaitanya <[email protected]> Authored: Wed Jul 13 15:46:13 2016 +0530 Committer: Chaitanya <[email protected]> Committed: Wed Jul 13 15:46:13 2016 +0530 ---------------------------------------------------------------------- library/pom.xml | 16 ++ .../datatorrent/lib/io/fs/S3BlockReader.java | 126 +++++++++ .../datatorrent/lib/io/fs/S3InputModule.java | 68 +++++ .../lib/io/fs/S3InputModuleAppTest.java | 257 +++++++++++++++++++ 4 files changed, 467 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5de26e4a/library/pom.xml ---------------------------------------------------------------------- diff --git a/library/pom.xml b/library/pom.xml index 7242027..e9f64c8 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -186,6 +186,16 @@ <suppressionsLocation>library-checkstyle-suppressions.xml</suppressionsLocation> </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <excludes> + <exclude>**/S3InputModuleAppTest.java</exclude> + </excludes> + </configuration> + </plugin> </plugins> </build> @@ -335,6 +345,12 @@ <version>1.8.5</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.10.73</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5de26e4a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java new file mode 100644 index 0000000..34f64ed --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.fs; + +import java.io.IOException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; + +import com.google.common.annotations.VisibleForTesting; +import com.datatorrent.api.Context; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * S3BlockReader extends from BlockReader and serves the functionality of read objects and + * parse Block metadata + */ [email protected] +public class S3BlockReader extends FSSliceReader +{ + protected transient String s3bucketUri; + private String bucketName; + + public S3BlockReader() + { + this.readerContext = new S3BlockReaderContext(); + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + s3bucketUri = fs.getScheme() + "://" + bucketName; + } + + /** + * Extracts the bucket name from the given uri + * @param s3uri s3 uri + * @return name of the bucket + */ + @VisibleForTesting + protected static String extractBucket(String s3uri) + { + return s3uri.substring(s3uri.indexOf('@') + 1, s3uri.indexOf("/", s3uri.indexOf('@'))); + } + + /** + * Create the stream from the bucket uri and block path. + * @param block block metadata + * @return stream + * @throws IOException + */ + @Override + protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException + { + FSDataInputStream ins = fs.open(new Path(s3bucketUri + block.getFilePath())); + ins.seek(block.getOffset()); + return ins; + } + + /** + * BlockReadeContext for reading S3 Blocks. Stream could't able be read the complete block. + * This will wait till the block reads completely. + */ + private static class S3BlockReaderContext extends ReaderContext.FixedBytesReaderContext<FSDataInputStream> + { + /** + * S3 File systems doesn't read the specified block completely while using readFully API. + * This will read small chunks continuously until will reach the specified block size. + * @return the block entity + * @throws IOException + */ + @Override + protected Entity readEntity() throws IOException + { + entity.clear(); + int bytesToRead = length; + if (offset + length >= blockMetadata.getLength()) { + bytesToRead = (int)(blockMetadata.getLength() - offset); + } + byte[] record = new byte[bytesToRead]; + int bytesRead = 0; + while (bytesRead < bytesToRead) { + bytesRead += stream.read(record, bytesRead, bytesToRead - bytesRead); + } + entity.setUsedBytes(bytesRead); + entity.setRecord(record); + return entity; + } + } + + /** + * Get the S3 bucket name + * @return bucket + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Set the bucket name + * @param bucketName bucket name + */ + public void setBucketName(String bucketName) + { + this.bucketName = bucketName; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5de26e4a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java new file mode 100644 index 0000000..50c40ec --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.fs; + +import com.datatorrent.lib.io.block.FSSliceReader; + +/** + * S3InputModule is used to read files/list of files (or directory) from S3 bucket. <br/> + * Module emits, <br/> + * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/> + * Parallel read will work only if the scheme is "s3a" and the Hadoop version is 2.7+. + * Parallel read doesn't work in the case of the scheme is "s3n/s3". In this case, this operator explicitly + * disables the parallel read functionality. + * For more info about S3 scheme protocals, please have a look at + * <a href="https://wiki.apache.org/hadoop/AmazonS3">https://wiki.apache.org/hadoop/AmazonS3.</a> + * + * The module reads data in parallel, following parameters can be configured<br/> + * 1. files: List of file(s)/directories to read. files would be in the form of + * SCHEME://AccessKey:SecretKey@BucketName/FileOrDirectory , + * SCHEME://AccessKey:SecretKey@BucketName/FileOrDirectory , .... + * where SCHEME is the protocal scheme for the file system. + * AccessKey is the AWS access key and SecretKey is the AWS Secret Key<br/> + * 2. filePatternRegularExp: Files names matching given regex will be read<br/> + * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/> + * 4. recursive: if scan recursively input directories<br/> + * 5. blockSize: block size used to read input blocks of file<br/> + * 6. readersCount: count of readers to read input file<br/> + * 7. sequencialFileRead: Is emit file blocks in sequence? + */ [email protected] +public class S3InputModule extends FSInputModule +{ + /** + * Creates the block reader for reading s3 blocks + * @return S3BlockReader + */ + @Override + public FSSliceReader createBlockReader() + { + //Extract the scheme from the files + String s3input = getFiles(); + String scheme = s3input.substring(0, s3input.indexOf("://")); + // Parallel read doesn't support, if the scheme is s3 (or) s3n. + if (scheme.equals("s3") || scheme.equals("s3n")) { + setSequencialFileRead(true); + } + // Set the s3 bucket name to the block reader + S3BlockReader reader = new S3BlockReader(); + reader.setBucketName(S3BlockReader.extractBucket(getFiles())); + return reader; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5de26e4a/library/src/test/java/com/datatorrent/lib/io/fs/S3InputModuleAppTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/S3InputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/S3InputModuleAppTest.java new file mode 100644 index 0000000..3d7aef0 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/io/fs/S3InputModuleAppTest.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.fs; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.util.Iterator; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.lib.io.block.AbstractBlockReader; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.stream.DevNull; +import com.datatorrent.netlet.util.Slice; + [email protected] +public class S3InputModuleAppTest +{ + private String inputDir; + static String outputDir; + private StreamingApplication app; + private String accessKey = "*************"; + private String secretKey = "**************"; + private AmazonS3 client; + private String files; + private static final String SCHEME = "s3n"; + private static final String FILE_1 = "file1.txt"; + private static final String FILE_2 = "file2.txt"; + private static final String FILE_1_DATA = "File one data"; + private static final String FILE_2_DATA = "File two data. This has more data hence more blocks."; + static final String OUT_DATA_FILE = "fileData.txt"; + static final String OUT_METADATA_FILE = "fileMetaData.txt"; + + public static class TestMeta extends TestWatcher + { + public String baseDirectory; + public String bucketKey; + + @Override + protected void starting(org.junit.runner.Description description) + { + this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName(); + this.bucketKey = new String("target-" + description.getMethodName()).toLowerCase(); + } + + } + + @Rule + public S3InputModuleAppTest.TestMeta testMeta = new S3InputModuleAppTest.TestMeta(); + + @Before + public void setup() throws Exception + { + client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)); + client.createBucket(testMeta.bucketKey); + + inputDir = testMeta.baseDirectory + File.separator + "input"; + outputDir = testMeta.baseDirectory + File.separator + "output"; + + File file1 = new File(inputDir + File.separator + FILE_1); + File file2 = new File(inputDir + File.separator + FILE_2); + + FileUtils.writeStringToFile(file1, FILE_1_DATA); + FileUtils.writeStringToFile(file2, FILE_2_DATA); + client.putObject(new PutObjectRequest(testMeta.bucketKey, "input/" + FILE_1, file1)); + client.putObject(new PutObjectRequest(testMeta.bucketKey, "input/" + FILE_2, file2)); + files = SCHEME + "://" + accessKey + ":" + secretKey + "@" + testMeta.bucketKey + "/input"; + } + + @After + public void tearDown() throws IOException + { + FileUtils.deleteDirectory(new File(inputDir)); + FileUtils.deleteDirectory(new File(outputDir)); + deleteBucketAndContent(); + //client.deleteBucket(testMeta.bucketKey); + } + + public void deleteBucketAndContent() + { + //Get the list of objects + ObjectListing objectListing = client.listObjects(testMeta.bucketKey); + for ( Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext(); ) { + S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next(); + LOG.info("Deleting an object: {}",objectSummary.getKey()); + client.deleteObject(testMeta.bucketKey, objectSummary.getKey()); + } + client.deleteBucket(testMeta.bucketKey); + } + + @Test + public void testS3Application() throws Exception + { + app = new S3InputModuleAppTest.Application(); + Configuration conf = new Configuration(false); + conf.set("dt.operator.s3InputModule.prop.files", files); + conf.set("dt.operator.s3InputModule.prop.blockSize", "10"); + conf.set("dt.operator.s3InputModule.prop.scanIntervalMillis", "10000"); + + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + + long now = System.currentTimeMillis(); + Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath()); + FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration()); + while (!fs.exists(outDir) && System.currentTimeMillis() - now < 20000) { + Thread.sleep(500); + LOG.debug("Waiting for {}", outDir); + } + + Thread.sleep(10000); + lc.shutdown(); + + Assert.assertTrue("output dir does not exist", fs.exists(outDir)); + + File dir = new File(outputDir); + FileFilter fileFilter = new WildcardFileFilter(OUT_METADATA_FILE + "*"); + verifyFileContents(dir.listFiles(fileFilter), "[fileName=file1.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/file1.txt]"); + verifyFileContents(dir.listFiles(fileFilter), "[fileName=file2.txt, numberOfBlocks=6, isDirectory=false, relativePath=input/file2.txt]"); + + fileFilter = new WildcardFileFilter(OUT_DATA_FILE + "*"); + verifyFileContents(dir.listFiles(fileFilter), FILE_1_DATA); + verifyFileContents(dir.listFiles(fileFilter), FILE_2_DATA); + } + + private void verifyFileContents(File[] files, String expectedData) throws IOException + { + StringBuilder filesData = new StringBuilder(); + for (File file : files) { + filesData.append(FileUtils.readFileToString(file)); + } + Assert.assertTrue("File data doesn't contain expected text", filesData.indexOf(expectedData) > -1); + } + + private static Logger LOG = LoggerFactory.getLogger(S3InputModuleAppTest.class); + + private static class Application implements StreamingApplication + { + public void populateDAG(DAG dag, Configuration conf) + { + S3InputModule module = dag.addModule("s3InputModule", S3InputModule.class); + + AbstractFileOutputOperator<AbstractFileSplitter.FileMetadata> metadataWriter = new S3InputModuleAppTest.MetadataWriter(S3InputModuleAppTest.OUT_METADATA_FILE); + metadataWriter.setFilePath(S3InputModuleAppTest.outputDir); + dag.addOperator("FileMetadataWriter", metadataWriter); + + AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>> dataWriter = new S3InputModuleAppTest.HDFSFileWriter(S3InputModuleAppTest.OUT_DATA_FILE); + dataWriter.setFilePath(S3InputModuleAppTest.outputDir); + dag.addOperator("FileDataWriter", dataWriter); + + DevNull<BlockMetadata.FileBlockMetadata> devNull = dag.addOperator("devNull", DevNull.class); + + dag.addStream("FileMetaData", module.filesMetadataOutput, metadataWriter.input); + dag.addStream("data", module.messages, dataWriter.input); + dag.addStream("blockMetadata", module.blocksMetadataOutput, devNull.data); + } + } + + private static class MetadataWriter extends AbstractFileOutputOperator<AbstractFileSplitter.FileMetadata> + { + String fileName; + + @SuppressWarnings("unused") + private MetadataWriter() + { + + } + + public MetadataWriter(String fileName) + { + this.fileName = fileName; + } + + @Override + protected String getFileName(AbstractFileSplitter.FileMetadata tuple) + { + return fileName; + } + + @Override + protected byte[] getBytesForTuple(AbstractFileSplitter.FileMetadata tuple) + { + return (tuple).toString().getBytes(); + } + } + + private static class HDFSFileWriter extends AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>> + { + String fileName; + + @SuppressWarnings("unused") + private HDFSFileWriter() + { + } + + public HDFSFileWriter(String fileName) + { + this.fileName = fileName; + } + + @Override + protected String getFileName(AbstractBlockReader.ReaderRecord<Slice> tuple) + { + return fileName; + } + + @Override + protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord<Slice> tuple) + { + return tuple.getRecord().buffer; + } + } + +}
