Repository: incubator-apex-malhar Updated Branches: refs/heads/master 51a19e1be -> becee7f82
APEXMALHAR-2008: HDFS File Input module Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f9fe3d5e Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f9fe3d5e Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f9fe3d5e Branch: refs/heads/master Commit: f9fe3d5e9e5ef06ebba313a9a09fa268ec9ead7c Parents: d3a7063 Author: Priyanka Gugale <[email protected]> Authored: Fri Mar 11 13:22:10 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Wed Mar 16 15:07:08 2016 +0530 ---------------------------------------------------------------------- .../datatorrent/lib/io/block/BlockMetadata.java | 31 ++- .../datatorrent/lib/io/block/BlockReader.java | 66 +++++ .../lib/io/fs/AbstractFileSplitter.java | 45 +++- .../lib/io/fs/FileSplitterInput.java | 81 +++++-- .../datatorrent/lib/io/fs/HDFSFileSplitter.java | 120 +++++++++ .../datatorrent/lib/io/fs/HDFSInputModule.java | 243 +++++++++++++++++++ .../lib/io/fs/FileSplitterInputTest.java | 2 +- .../lib/io/fs/HDFSInputModuleAppTest.java | 221 +++++++++++++++++ 8 files changed, 786 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java index 534024d..6e38e45 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java @@ -196,11 +196,13 @@ public interface BlockMetadata } /** - * A block of file which contains file path adn other block properties. + * A block of file which contains file path and other block properties. + * It also controls if blocks should be read in sequence */ class FileBlockMetadata extends AbstractBlockMetadata { private final String filePath; + private boolean readBlockInSequence = false; protected FileBlockMetadata() { @@ -225,10 +227,37 @@ public interface BlockMetadata return filePath; } + /** + * Get if blocks should be read in sequence + * @return readBlockInSequence + */ + public boolean isReadBlockInSequence() + { + return readBlockInSequence; + } + + /** + * Set if blokcs should be read in sequence + * @param readBlockInSequence + */ + public void setReadBlockInSequence(boolean readBlockInSequence) + { + this.readBlockInSequence = readBlockInSequence; + } + public FileBlockMetadata newInstance(@NotNull String filePath) { Preconditions.checkNotNull(filePath); return new FileBlockMetadata(filePath); } + + @Override + public int hashCode() + { + if (isReadBlockInSequence()) { + return getFilePath().hashCode(); + } + return super.hashCode(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java new file mode 100644 index 0000000..f4f7d76 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.block; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.fs.FileSystem; + +import com.datatorrent.api.AutoMetric; + +/** + * BlockReader extends {@link FSSliceReader} to accept case insensitive uri + */ +public class BlockReader extends FSSliceReader +{ + @AutoMetric + private long bytesRead; + + protected String uri; + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + bytesRead = 0; + } + + @Override + protected FileSystem getFSInstance() throws IOException + { + return FileSystem.newInstance(URI.create(uri), configuration); + } + + /** + * Sets the uri + * + * @param uri of form hdfs://hostname:port/path/to/input + */ + public void setUri(String uri) + { + this.uri = uri; + } + + public String getUri() + { + return uri; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java index cd47d48..b39168c 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java @@ -18,6 +18,7 @@ */ package com.datatorrent.lib.io.fs; +import java.io.File; import java.io.IOException; import java.util.Iterator; @@ -27,12 +28,10 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import com.google.common.base.Preconditions; - import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultOutputPort; @@ -206,6 +205,13 @@ public abstract class AbstractFileSplitter extends BaseOperator fileMetadata.setDirectory(status.isDirectory()); fileMetadata.setFileLength(status.getLen()); + if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input. + fileMetadata.setRelativePath(status.getPath().getName()); + } else { + String relativePath = getRelativePathWithFolderName(fileInfo); + fileMetadata.setRelativePath(relativePath); + } + if (!status.isDirectory()) { int noOfBlocks = (int)((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1)); if (fileMetadata.getDataOffset() >= status.getLen()) { @@ -217,6 +223,15 @@ public abstract class AbstractFileSplitter extends BaseOperator return fileMetadata; } + /* + * As folder name was given to input for copy, prefix folder name to the sub items to copy. + */ + private String getRelativePathWithFolderName(FileInfo fileInfo) + { + String parentDir = new Path(fileInfo.getDirectoryPath()).getName(); + return parentDir + File.separator + fileInfo.getRelativeFilePath(); + } + /** * This can be over-ridden to create file metadata of type that extends {@link FileSplitterInput.FileMetadata} * @@ -346,6 +361,7 @@ public abstract class AbstractFileSplitter extends BaseOperator private long discoverTime; private long[] blockIds; private boolean isDirectory; + private String relativePath; @SuppressWarnings("unused") protected FileMetadata() @@ -493,6 +509,31 @@ public abstract class AbstractFileSplitter extends BaseOperator { return isDirectory; } + + /** + * Sets relative file path + * @return relativePath + */ + public String getRelativePath() + { + return relativePath; + } + + /** + * Gets relative file path + * @param relativePath + */ + public void setRelativePath(String relativePath) + { + this.relativePath = relativePath; + } + + @Override + public String toString() + { + return "FileMetadata [fileName=" + fileName + ", numberOfBlocks=" + numberOfBlocks + ", isDirectory=" + isDirectory + ", relativePath=" + relativePath + "]"; + } + } /** http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index ab70047..1d8248f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -292,6 +292,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper private transient ScannedFileInfo lastScannedInfo; private transient int numDiscoveredPerIteration; + @NotNull + protected final Map<String, Map<String, Long>> inputDirTolastModifiedTimes; + public TimeBasedDirectoryScanner() { recursive = true; @@ -301,6 +304,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper discoveredFiles = new LinkedBlockingDeque<>(); atomicThrowable = new AtomicReference<>(); ignoredFiles = Sets.newHashSet(); + inputDirTolastModifiedTimes = Maps.newHashMap(); } @Override @@ -360,7 +364,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper lastScannedInfo = null; numDiscoveredPerIteration = 0; for (String afile : files) { - scan(new Path(afile), null); + Map<String, Long> lastModifiedTimesForInputDir; + lastModifiedTimesForInputDir = getLastModifiedTimeMap(afile); + scan(new Path(afile), null, lastModifiedTimesForInputDir); } scanIterationComplete(); } else { @@ -375,6 +381,15 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper } } + private Map<String, Long> getLastModifiedTimeMap(String key) + { + if (inputDirTolastModifiedTimes.get(key) == null) { + Map<String, Long> modifiedTimeMap = Maps.newHashMap(); + inputDirTolastModifiedTimes.put(key, modifiedTimeMap); + } + return inputDirTolastModifiedTimes.get(key); + } + /** * Operations that need to be done once a scan is complete. */ @@ -386,6 +401,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper protected void scan(@NotNull Path filePath, Path rootPath) { + Map<String, Long> lastModifiedTimesForInputDir; + lastModifiedTimesForInputDir = getLastModifiedTimeMap(filePath.toUri().getPath()); + scan(filePath, rootPath, lastModifiedTimesForInputDir); + } + + private void scan(Path filePath, Path rootPath, Map<String, Long> lastModifiedTimesForInputDir) + { try { FileStatus parentStatus = fs.getFileStatus(filePath); String parentPathStr = filePath.toUri().getPath(); @@ -393,27 +415,22 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper LOG.debug("scan {}", parentPathStr); FileStatus[] childStatuses = fs.listStatus(filePath); - for (FileStatus status : childStatuses) { - Path childPath = status.getPath(); - ScannedFileInfo info = createScannedFileInfo(filePath, parentStatus, childPath, status, rootPath); - - if (skipFile(childPath, status.getModificationTime(), referenceTimes.get(info.getFilePath()))) { - continue; - } - if (status.isDirectory()) { - if (recursive) { - scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath); - } - } + if (childStatuses.length == 0 && rootPath == null && lastModifiedTimesForInputDir.get(parentPathStr) == null) { // empty input directory copy as is + ScannedFileInfo info = new ScannedFileInfo(null, filePath.toString(), parentStatus.getModificationTime()); + processDiscoveredFile(info); + lastModifiedTimesForInputDir.put(parentPathStr, parentStatus.getModificationTime()); + } + for (FileStatus childStatus : childStatuses) { + Path childPath = childStatus.getPath(); String childPathStr = childPath.toUri().getPath(); - if (ignoredFiles.contains(childPathStr)) { - continue; - } - if (acceptFile(childPathStr)) { - LOG.debug("found {}", childPathStr); - processDiscoveredFile(info); + + if (childStatus.isDirectory() && isRecursive()) { + addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir); + scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath, lastModifiedTimesForInputDir); + } else if (acceptFile(childPathStr)) { + addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir); } else { // don't look at it again ignoredFiles.add(childPathStr); @@ -426,6 +443,31 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper } } + private void addToDiscoveredFiles(Path rootPath, FileStatus parentStatus, FileStatus childStatus, + Map<String, Long> lastModifiedTimesForInputDir) throws IOException + { + Path childPath = childStatus.getPath(); + String childPathStr = childPath.toUri().getPath(); + // Directory by now is scanned forcibly. Now check for whether file/directory needs to be added to discoveredFiles. + Long oldModificationTime = lastModifiedTimesForInputDir.get(childPathStr); + lastModifiedTimesForInputDir.put(childPathStr, childStatus.getModificationTime()); + + if (skipFile(childPath, childStatus.getModificationTime(), oldModificationTime) || // Skip dir or file if no timestamp modification + (childStatus.isDirectory() && (oldModificationTime != null))) { // If timestamp modified but if its a directory and already present in map, then skip. + return; + } + + if (ignoredFiles.contains(childPathStr)) { + return; + } + + ScannedFileInfo info = createScannedFileInfo(parentStatus.getPath(), parentStatus, childPath, childStatus, + rootPath); + + LOG.debug("Processing file: " + info.getFilePath()); + processDiscoveredFile(info); + } + protected void processDiscoveredFile(ScannedFileInfo info) { numDiscoveredPerIteration++; @@ -619,4 +661,5 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper } private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java new file mode 100644 index 0000000..24466d5 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.fs; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata; + +/** + * HDFSFileSplitter extends {@link FileSplitterInput} to, + * 1. Add relative path to file metadata. + * 2. Ignore HDFS temp files (files with extensions _COPYING_). + * 3. Set sequencial read option on readers. + */ +public class HDFSFileSplitter extends FileSplitterInput +{ + private boolean sequencialFileRead; + + public HDFSFileSplitter() + { + super(); + super.setScanner(new HDFSScanner()); + } + + + @Override + protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata) + { + FileBlockMetadata blockMetadta = new FileBlockMetadata(fileMetadata.getFilePath()); + blockMetadta.setReadBlockInSequence(sequencialFileRead); + return blockMetadta; + } + + public boolean isSequencialFileRead() + { + return sequencialFileRead; + } + + public void setSequencialFileRead(boolean sequencialFileRead) + { + this.sequencialFileRead = sequencialFileRead; + } + + /** + * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS temporary files + * and files containing unsupported characters. + */ + public static class HDFSScanner extends TimeBasedDirectoryScanner + { + protected static final String UNSUPPORTED_CHARACTOR = ":"; + private String ignoreFilePatternRegularExp = ".*._COPYING_"; + private transient Pattern ignoreRegex; + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp); + } + + @Override + protected boolean acceptFile(String filePathStr) + { + boolean accepted = super.acceptFile(filePathStr); + if (containsUnsupportedCharacters(filePathStr) || isIgnoredFile(filePathStr)) { + return false; + } + return accepted; + } + + private boolean isIgnoredFile(String filePathStr) + { + String fileName = new Path(filePathStr).getName(); + if (ignoreRegex != null) { + Matcher matcher = ignoreRegex.matcher(fileName); + if (matcher.matches()) { + return true; + } + } + return false; + } + + private boolean containsUnsupportedCharacters(String filePathStr) + { + return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR); + } + + public String getIgnoreFilePatternRegularExp() + { + return ignoreFilePatternRegularExp; + } + + public void setIgnoreFilePatternRegularExp(String ignoreFilePatternRegularExp) + { + this.ignoreFilePatternRegularExp = ignoreFilePatternRegularExp; + this.ignoreRegex = Pattern.compile(ignoreFilePatternRegularExp); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java new file mode 100644 index 0000000..2b914f1 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.fs; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord; +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata; +import com.datatorrent.lib.io.block.BlockReader; +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata; +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner; +import com.datatorrent.netlet.util.Slice; + +/** + * HDFSInputModule is used to read files/list of files (or directory) from HDFS. <br/> + * Module emits, <br/> + * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/> + * The module reads data in parallel, following parameters can be configured<br/> + * 1. files: list of file(s)/directories to read<br/> + * 2. filePatternRegularExp: Files names matching given regex will be read<br/> + * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/> + * 4. recursive: if scan recursively input directories<br/> + * 5. blockSize: block size used to read input blocks of file<br/> + * 6. readersCount: count of readers to read input file<br/> + * 7. sequencialFileRead: If emit file blocks in sequence? + */ +public class HDFSInputModule implements Module +{ + + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(0) + private long scanIntervalMillis; + private boolean recursive = true; + private long blockSize; + private boolean sequencialFileRead = false; + private int readersCount; + + public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = new ProxyOutputPort<>(); + public final transient ProxyOutputPort<FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>(); + public final transient ProxyOutputPort<ReaderRecord<Slice>> messages = new ProxyOutputPort<>(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + HDFSFileSplitter fileSplitter = dag.addOperator("FileSplitter", new HDFSFileSplitter()); + BlockReader blockReader = dag.addOperator("BlockReader", new BlockReader()); + + dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, blockReader.blocksMetadataInput); + + filesMetadataOutput.set(fileSplitter.filesMetadataOutput); + blocksMetadataOutput.set(blockReader.blocksMetadataOutput); + messages.set(blockReader.messages); + + fileSplitter.setSequencialFileRead(sequencialFileRead); + if (blockSize != 0) { + fileSplitter.setBlockSize(blockSize); + } + + HDFSScanner fileScanner = (HDFSScanner)fileSplitter.getScanner(); + fileScanner.setFiles(files); + if (scanIntervalMillis != 0) { + fileScanner.setScanIntervalMillis(scanIntervalMillis); + } + fileScanner.setRecursive(recursive); + if (filePatternRegularExp != null) { + fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp); + } + + blockReader.setUri(files); + if (readersCount != 0) { + dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<BlockReader>(readersCount)); + } + } + + /** + * A comma separated list of directories to scan. If the path is not fully qualified the default file system is used. + * A fully qualified path can be provided to scan directories in other filesystems. + * + * @param files + * files + */ + public void setFiles(String files) + { + this.files = files; + } + + /** + * Gets the files to be scanned. + * + * @return files to be scanned. + */ + public String getFiles() + { + return files; + } + + /** + * Gets the regular expression for file names to split + * + * @return regular expression + */ + public String getFilePatternRegularExp() + { + return filePatternRegularExp; + } + + /** + * Only files with names matching the given java regular expression are split + * + * @param filePatternRegexp + * regular expression + */ + public void setFilePatternRegularExp(String filePatternRegexp) + { + this.filePatternRegularExp = filePatternRegexp; + } + + /** + * Gets scan interval in milliseconds, interval between two scans to discover new files in input directory + * + * @return scanInterval milliseconds + */ + public long getScanIntervalMillis() + { + return scanIntervalMillis; + } + + /** + * Sets scan interval in milliseconds, interval between two scans to discover new files in input directory + * + * @param scanIntervalMillis + */ + public void setScanIntervalMillis(long scanIntervalMillis) + { + this.scanIntervalMillis = scanIntervalMillis; + } + + /** + * Get is scan recursive + * + * @return isRecursive + */ + public boolean isRecursive() + { + return recursive; + } + + /** + * set is scan recursive + * + * @param recursive + */ + public void setRecursive(boolean recursive) + { + this.recursive = recursive; + } + + /** + * Get block size used to read input blocks of file + * + * @return blockSize + */ + public long getBlockSize() + { + return blockSize; + } + + /** + * Sets block size used to read input blocks of file + * + * @param blockSize + */ + public void setBlockSize(long blockSize) + { + this.blockSize = blockSize; + } + + /** + * Gets readers count + * @return readersCount + */ + public int getReadersCount() + { + return readersCount; + } + + /** + * Static count of readers to read input file + * @param readersCount + */ + public void setReadersCount(int readersCount) + { + this.readersCount = readersCount; + } + + /** + * Gets is sequencial file read + * + * @return sequencialFileRead + */ + public boolean isSequencialFileRead() + { + return sequencialFileRead; + } + + /** + * Sets is sequencial file read + * + * @param sequencialFileRead + */ + public void setSequencialFileRead(boolean sequencialFileRead) + { + this.sequencialFileRead = sequencialFileRead; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java index cd0de2d..1d6cf03 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java @@ -71,7 +71,7 @@ public class FileSplitterInputTest } allLines.addAll(lines); File created = new File(dataDirectory, "file" + file + ".txt"); - filePaths.add(new Path(dataDirectory, created.getName()).toUri().toString()); + filePaths.add(created.getAbsolutePath()); FileUtils.write(created, StringUtils.join(lines, '\n')); } return filePaths; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java new file mode 100644 index 0000000..8bb1e26 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.fs; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord; +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata; +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata; +import com.datatorrent.lib.stream.DevNull; +import com.datatorrent.netlet.util.Slice; + +public class HDFSInputModuleAppTest +{ + private String inputDir; + static String outputDir; + private StreamingApplication app; + private static final String FILE_1 = "file1.txt"; + private static final String FILE_2 = "file2.txt"; + private static final String FILE_1_DATA = "File one data"; + private static final String FILE_2_DATA = "File two data. This has more data hence more blocks."; + static final String OUT_DATA_FILE = "fileData.txt"; + static final String OUT_METADATA_FILE = "fileMetaData.txt"; + + public static class TestMeta extends TestWatcher + { + public String baseDirectory; + + @Override + protected void starting(org.junit.runner.Description description) + { + this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName(); + } + + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Before + public void setup() throws Exception + { + inputDir = testMeta.baseDirectory + File.separator + "input"; + outputDir = testMeta.baseDirectory + File.separator + "output"; + + FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), FILE_1_DATA); + FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), FILE_2_DATA); + FileUtils.forceMkdir(new File(inputDir + File.separator + "dir")); + FileUtils.writeStringToFile(new File(inputDir + File.separator + "dir/inner.txt"), FILE_1_DATA); + } + + @After + public void tearDown() throws IOException + { + FileUtils.deleteDirectory(new File(inputDir)); + } + + @Test + public void testApplication() throws Exception + { + app = new Application(); + Configuration conf = new Configuration(false); + conf.set("dt.operator.hdfsInputModule.prop.files", inputDir); + conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10"); + conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000"); + + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + + long now = System.currentTimeMillis(); + Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath()); + FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration()); + while (!fs.exists(outDir) && System.currentTimeMillis() - now < 20000) { + Thread.sleep(500); + LOG.debug("Waiting for {}", outDir); + } + + Thread.sleep(10000); + lc.shutdown(); + + Assert.assertTrue("output dir does not exist", fs.exists(outDir)); + + File dir = new File(outputDir); + FileFilter fileFilter = new WildcardFileFilter(OUT_METADATA_FILE + "*"); + verifyFileContents(dir.listFiles(fileFilter), "[fileName=file1.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/file1.txt]"); + verifyFileContents(dir.listFiles(fileFilter), "[fileName=file2.txt, numberOfBlocks=6, isDirectory=false, relativePath=input/file2.txt]"); + verifyFileContents(dir.listFiles(fileFilter), "[fileName=dir, numberOfBlocks=0, isDirectory=true, relativePath=input/dir]"); + verifyFileContents(dir.listFiles(fileFilter), "[fileName=inner.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/dir/inner.txt]"); + + fileFilter = new WildcardFileFilter(OUT_DATA_FILE + "*"); + verifyFileContents(dir.listFiles(fileFilter), FILE_1_DATA); + verifyFileContents(dir.listFiles(fileFilter), FILE_2_DATA); + } + + private void verifyFileContents(File[] files, String expectedData) throws IOException + { + StringBuilder filesData = new StringBuilder(); + for (File file : files) { + filesData.append(FileUtils.readFileToString(file)); + } + Assert.assertTrue("File data doesn't contain expected text" , filesData.indexOf(expectedData) > -1); + } + + private static Logger LOG = LoggerFactory.getLogger(HDFSInputModuleAppTest.class); + + private static class Application implements StreamingApplication + { + public void populateDAG(DAG dag, Configuration conf) + { + HDFSInputModule module = dag.addModule("hdfsInputModule", HDFSInputModule.class); + + AbstractFileOutputOperator<FileMetadata> metadataWriter = new MetadataWriter(HDFSInputModuleAppTest.OUT_METADATA_FILE); + metadataWriter.setFilePath(HDFSInputModuleAppTest.outputDir); + dag.addOperator("FileMetadataWriter", metadataWriter); + + AbstractFileOutputOperator<ReaderRecord<Slice>> dataWriter = new HDFSFileWriter(HDFSInputModuleAppTest.OUT_DATA_FILE); + dataWriter.setFilePath(HDFSInputModuleAppTest.outputDir); + dag.addOperator("FileDataWriter", dataWriter); + + DevNull<FileBlockMetadata> devNull = dag.addOperator("devNull", DevNull.class); + + dag.addStream("FileMetaData", module.filesMetadataOutput, metadataWriter.input); + dag.addStream("data", module.messages, dataWriter.input); + dag.addStream("blockMetadata", module.blocksMetadataOutput, devNull.data); + } + } + + private static class MetadataWriter extends AbstractFileOutputOperator<FileMetadata> + { + String fileName; + + @SuppressWarnings("unused") + private MetadataWriter() + { + + } + + public MetadataWriter(String fileName) + { + this.fileName = fileName; + } + + @Override + protected String getFileName(FileMetadata tuple) + { + return fileName; + } + + @Override + protected byte[] getBytesForTuple(FileMetadata tuple) + { + return (tuple).toString().getBytes(); + } + } + + private static class HDFSFileWriter extends AbstractFileOutputOperator<ReaderRecord<Slice>> + { + String fileName; + + @SuppressWarnings("unused") + private HDFSFileWriter() + { + } + + public HDFSFileWriter(String fileName) + { + this.fileName = fileName; + } + + @Override + protected String getFileName(ReaderRecord<Slice> tuple) + { + return fileName; + } + + @Override + protected byte[] getBytesForTuple(ReaderRecord<Slice> tuple) + { + return tuple.getRecord().buffer; + } + } + +}
