Repository: incubator-apex-malhar Updated Branches: refs/heads/master 4e0217b43 -> 31cf63770
APEXMALHAR-2018 Moved generic HDFS Input Moduel code to abstract class. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/31cf6377 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/31cf6377 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/31cf6377 Branch: refs/heads/master Commit: 31cf63770e8cb78380f942254e41a006bcba7cd6 Parents: 4e0217b Author: Chaitanya <chaita...@datatorrent.com> Authored: Mon Apr 25 17:38:55 2016 +0530 Committer: Chaitanya <chaita...@datatorrent.com> Committed: Mon Apr 25 17:38:55 2016 +0530 ---------------------------------------------------------------------- .../datatorrent/lib/io/fs/FSFileSplitter.java | 130 ++++++++++ .../datatorrent/lib/io/fs/FSInputModule.java | 242 +++++++++++++++++++ .../datatorrent/lib/io/fs/HDFSFileSplitter.java | 91 +------ .../datatorrent/lib/io/fs/HDFSInputModule.java | 207 +--------------- 4 files changed, 382 insertions(+), 288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/31cf6377/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java new file mode 100644 index 0000000..4318994 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.fs; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.fs.Path; +import com.datatorrent.api.Context; +import com.datatorrent.lib.io.block.BlockMetadata; + +/** + * FSFileSplitter extends {@link FileSplitterInput} to, + * 1. Ignore files with extension "ignoreFilePatternRegularExp" + * 2. Set sequencial read option on readers. + */ +public class FSFileSplitter extends FileSplitterInput +{ + private boolean sequencialFileRead; + + public FSFileSplitter() + { + super(); + super.setScanner(new FSScanner()); + } + + @Override + protected BlockMetadata.FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata) + { + BlockMetadata.FileBlockMetadata blockMetadta = new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath()); + blockMetadta.setReadBlockInSequence(sequencialFileRead); + return blockMetadta; + } + + public boolean isSequencialFileRead() + { + return sequencialFileRead; + } + + public void setSequencialFileRead(boolean sequencialFileRead) + { + this.sequencialFileRead = sequencialFileRead; + } + + /** + * FSScanner extends {@link TimeBasedDirectoryScanner} to ignore temporary files + * and files containing unsupported characters. + */ + public static class FSScanner extends TimeBasedDirectoryScanner + { + private String unsupportedCharacter; + private String ignoreFilePatternRegularExp; + private transient Pattern ignoreRegex; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + if (ignoreFilePatternRegularExp != null) { + ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp); + } + } + + @Override + protected boolean acceptFile(String filePathStr) + { + boolean accepted = super.acceptFile(filePathStr); + if (containsUnsupportedCharacters(filePathStr) || isIgnoredFile(filePathStr)) { + return false; + } + return accepted; + } + + private boolean isIgnoredFile(String filePathStr) + { + String fileName = new Path(filePathStr).getName(); + if (ignoreRegex != null) { + Matcher matcher = ignoreRegex.matcher(fileName); + if (matcher.matches()) { + return true; + } + } + return false; + } + + private boolean containsUnsupportedCharacters(String filePathStr) + { + if (unsupportedCharacter != null) { + return new Path(filePathStr).toUri().getPath().contains(unsupportedCharacter); + } + return false; + } + + public String getIgnoreFilePatternRegularExp() + { + return ignoreFilePatternRegularExp; + } + + public void setIgnoreFilePatternRegularExp(String ignoreFilePatternRegularExp) + { + this.ignoreFilePatternRegularExp = ignoreFilePatternRegularExp; + this.ignoreRegex = Pattern.compile(ignoreFilePatternRegularExp); + } + + public String getUnsupportedCharacter() + { + return unsupportedCharacter; + } + + public void setUnsupportedCharacter(String unsupportedCharacter) + { + this.unsupportedCharacter = unsupportedCharacter; + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/31cf6377/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java new file mode 100644 index 0000000..e5221a0 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.io.fs; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.io.block.AbstractBlockReader; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.BlockReader; +import com.datatorrent.netlet.util.Slice; + +/** + * FSInputModule is an abstract class used to read files from file systems like HDFS, NFS, S3, etc. <br/> + * FSInputModule emits FileMetadata, BlockMetadata, BlockBytes. <br/> + * The module reads data in parallel, following parameters can be configured<br/> + * 1. files: list of file(s)/directories to read<br/> + * 2. filePatternRegularExp: Files names matching given regex will be read<br/> + * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/> + * 4. recursive: if scan recursively input directories<br/> + * 5. blockSize: block size used to read input blocks of file<br/> + * 6. readersCount: count of readers to read input file<br/> + * 7. sequencialFileRead: If emit file blocks in sequence? + */ + +public abstract class FSInputModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(0) + private long scanIntervalMillis; + private boolean recursive = true; + private long blockSize; + private boolean sequencialFileRead = false; + private int readersCount; + + public final transient ProxyOutputPort<AbstractFileSplitter.FileMetadata> filesMetadataOutput = new ProxyOutputPort<>(); + public final transient ProxyOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>(); + public final transient ProxyOutputPort<AbstractBlockReader.ReaderRecord<Slice>> messages = new ProxyOutputPort<>(); + + public abstract FSFileSplitter createFileSplitter(); + + public abstract BlockReader createBlockReader(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + FSFileSplitter fileSplitter = dag.addOperator("FileSplitter", createFileSplitter()); + BlockReader blockReader = dag.addOperator("BlockReader", createBlockReader()); + + dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, blockReader.blocksMetadataInput); + + filesMetadataOutput.set(fileSplitter.filesMetadataOutput); + blocksMetadataOutput.set(blockReader.blocksMetadataOutput); + messages.set(blockReader.messages); + + fileSplitter.setSequencialFileRead(sequencialFileRead); + if (blockSize != 0) { + fileSplitter.setBlockSize(blockSize); + } + + FSFileSplitter.FSScanner fileScanner = (FSFileSplitter.FSScanner)fileSplitter.getScanner(); + fileScanner.setFiles(files); + if (scanIntervalMillis != 0) { + fileScanner.setScanIntervalMillis(scanIntervalMillis); + } + fileScanner.setRecursive(recursive); + if (filePatternRegularExp != null) { + fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp); + } + + blockReader.setUri(files); + if (readersCount != 0) { + dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<BlockReader>(readersCount)); + fileSplitter.setBlocksThreshold(readersCount); + } + } + + /** + * A comma separated list of directories to scan. If the path is not fully qualified the default file system is used. + * A fully qualified path can be provided to scan directories in other filesystems. + * + * @param files + * files + */ + public void setFiles(String files) + { + this.files = files; + } + + /** + * Gets the files to be scanned. + * + * @return files to be scanned. + */ + public String getFiles() + { + return files; + } + + /** + * Gets the regular expression for file names to split + * + * @return regular expression + */ + public String getFilePatternRegularExp() + { + return filePatternRegularExp; + } + + /** + * Only files with names matching the given java regular expression are split + * + * @param filePatternRegexp + * regular expression + */ + public void setFilePatternRegularExp(String filePatternRegexp) + { + this.filePatternRegularExp = filePatternRegexp; + } + + /** + * Gets scan interval in milliseconds, interval between two scans to discover new files in input directory + * + * @return scanInterval milliseconds + */ + public long getScanIntervalMillis() + { + return scanIntervalMillis; + } + + /** + * Sets scan interval in milliseconds, interval between two scans to discover new files in input directory + * + * @param scanIntervalMillis + */ + public void setScanIntervalMillis(long scanIntervalMillis) + { + this.scanIntervalMillis = scanIntervalMillis; + } + + /** + * Get is scan recursive + * + * @return isRecursive + */ + public boolean isRecursive() + { + return recursive; + } + + /** + * set is scan recursive + * + * @param recursive + */ + public void setRecursive(boolean recursive) + { + this.recursive = recursive; + } + + /** + * Get block size used to read input blocks of file + * + * @return blockSize + */ + public long getBlockSize() + { + return blockSize; + } + + /** + * Sets block size used to read input blocks of file + * + * @param blockSize + */ + public void setBlockSize(long blockSize) + { + this.blockSize = blockSize; + } + + /** + * Gets readers count + * @return readersCount + */ + public int getReadersCount() + { + return readersCount; + } + + /** + * Static count of readers to read input file + * @param readersCount + */ + public void setReadersCount(int readersCount) + { + this.readersCount = readersCount; + } + + /** + * Gets is sequencial file read + * + * @return sequencialFileRead + */ + public boolean isSequencialFileRead() + { + return sequencialFileRead; + } + + /** + * Sets is sequencial file read + * + * @param sequencialFileRead + */ + public void setSequencialFileRead(boolean sequencialFileRead) + { + this.sequencialFileRead = sequencialFileRead; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/31cf6377/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java index 24466d5..26f2b6d 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java @@ -18,103 +18,20 @@ */ package com.datatorrent.lib.io.fs; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.fs.Path; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata; - /** * HDFSFileSplitter extends {@link FileSplitterInput} to, * 1. Add relative path to file metadata. * 2. Ignore HDFS temp files (files with extensions _COPYING_). * 3. Set sequencial read option on readers. */ -public class HDFSFileSplitter extends FileSplitterInput +public class HDFSFileSplitter extends FSFileSplitter { - private boolean sequencialFileRead; - public HDFSFileSplitter() { super(); - super.setScanner(new HDFSScanner()); - } - - - @Override - protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata) - { - FileBlockMetadata blockMetadta = new FileBlockMetadata(fileMetadata.getFilePath()); - blockMetadta.setReadBlockInSequence(sequencialFileRead); - return blockMetadta; - } - - public boolean isSequencialFileRead() - { - return sequencialFileRead; - } - - public void setSequencialFileRead(boolean sequencialFileRead) - { - this.sequencialFileRead = sequencialFileRead; - } - - /** - * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS temporary files - * and files containing unsupported characters. - */ - public static class HDFSScanner extends TimeBasedDirectoryScanner - { - protected static final String UNSUPPORTED_CHARACTOR = ":"; - private String ignoreFilePatternRegularExp = ".*._COPYING_"; - private transient Pattern ignoreRegex; - - @Override - public void setup(OperatorContext context) - { - super.setup(context); - ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp); - } - - @Override - protected boolean acceptFile(String filePathStr) - { - boolean accepted = super.acceptFile(filePathStr); - if (containsUnsupportedCharacters(filePathStr) || isIgnoredFile(filePathStr)) { - return false; - } - return accepted; - } - - private boolean isIgnoredFile(String filePathStr) - { - String fileName = new Path(filePathStr).getName(); - if (ignoreRegex != null) { - Matcher matcher = ignoreRegex.matcher(fileName); - if (matcher.matches()) { - return true; - } - } - return false; - } - - private boolean containsUnsupportedCharacters(String filePathStr) - { - return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR); - } - - public String getIgnoreFilePatternRegularExp() - { - return ignoreFilePatternRegularExp; - } - - public void setIgnoreFilePatternRegularExp(String ignoreFilePatternRegularExp) - { - this.ignoreFilePatternRegularExp = ignoreFilePatternRegularExp; - this.ignoreRegex = Pattern.compile(ignoreFilePatternRegularExp); - } + FSFileSplitter.FSScanner scanner = (FSFileSplitter.FSScanner)getScanner(); + scanner.setIgnoreFilePatternRegularExp(".*._COPYING_"); + scanner.setUnsupportedCharacter(":"); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/31cf6377/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java index 4456352..de99fd3 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java @@ -18,22 +18,7 @@ */ package com.datatorrent.lib.io.fs; -import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; -import javax.validation.constraints.Size; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.Module; -import com.datatorrent.common.partitioner.StatelessPartitioner; -import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord; -import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata; import com.datatorrent.lib.io.block.BlockReader; -import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata; -import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner; -import com.datatorrent.netlet.util.Slice; /** * HDFSInputModule is used to read files/list of files (or directory) from HDFS. <br/> @@ -48,197 +33,17 @@ import com.datatorrent.netlet.util.Slice; * 6. readersCount: count of readers to read input file<br/> * 7. sequencialFileRead: If emit file blocks in sequence? */ -public class HDFSInputModule implements Module +public class HDFSInputModule extends FSInputModule { - - @NotNull - @Size(min = 1) - private String files; - private String filePatternRegularExp; - @Min(0) - private long scanIntervalMillis; - private boolean recursive = true; - private long blockSize; - private boolean sequencialFileRead = false; - private int readersCount; - - public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = new ProxyOutputPort<>(); - public final transient ProxyOutputPort<FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>(); - public final transient ProxyOutputPort<ReaderRecord<Slice>> messages = new ProxyOutputPort<>(); - @Override - public void populateDAG(DAG dag, Configuration conf) - { - HDFSFileSplitter fileSplitter = dag.addOperator("FileSplitter", new HDFSFileSplitter()); - BlockReader blockReader = dag.addOperator("BlockReader", new BlockReader()); - - dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, blockReader.blocksMetadataInput); - - filesMetadataOutput.set(fileSplitter.filesMetadataOutput); - blocksMetadataOutput.set(blockReader.blocksMetadataOutput); - messages.set(blockReader.messages); - - fileSplitter.setSequencialFileRead(sequencialFileRead); - if (blockSize != 0) { - fileSplitter.setBlockSize(blockSize); - } - - HDFSScanner fileScanner = (HDFSScanner)fileSplitter.getScanner(); - fileScanner.setFiles(files); - if (scanIntervalMillis != 0) { - fileScanner.setScanIntervalMillis(scanIntervalMillis); - } - fileScanner.setRecursive(recursive); - if (filePatternRegularExp != null) { - fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp); - } - - blockReader.setUri(files); - if (readersCount != 0) { - dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<BlockReader>(readersCount)); - fileSplitter.setBlocksThreshold(readersCount); - } - } - - /** - * A comma separated list of directories to scan. If the path is not fully qualified the default file system is used. - * A fully qualified path can be provided to scan directories in other filesystems. - * - * @param files - * files - */ - public void setFiles(String files) + public FSFileSplitter createFileSplitter() { - this.files = files; + return new HDFSFileSplitter(); } - /** - * Gets the files to be scanned. - * - * @return files to be scanned. - */ - public String getFiles() - { - return files; - } - - /** - * Gets the regular expression for file names to split - * - * @return regular expression - */ - public String getFilePatternRegularExp() - { - return filePatternRegularExp; - } - - /** - * Only files with names matching the given java regular expression are split - * - * @param filePatternRegexp - * regular expression - */ - public void setFilePatternRegularExp(String filePatternRegexp) - { - this.filePatternRegularExp = filePatternRegexp; - } - - /** - * Gets scan interval in milliseconds, interval between two scans to discover new files in input directory - * - * @return scanInterval milliseconds - */ - public long getScanIntervalMillis() - { - return scanIntervalMillis; - } - - /** - * Sets scan interval in milliseconds, interval between two scans to discover new files in input directory - * - * @param scanIntervalMillis - */ - public void setScanIntervalMillis(long scanIntervalMillis) - { - this.scanIntervalMillis = scanIntervalMillis; - } - - /** - * Get is scan recursive - * - * @return isRecursive - */ - public boolean isRecursive() - { - return recursive; - } - - /** - * set is scan recursive - * - * @param recursive - */ - public void setRecursive(boolean recursive) - { - this.recursive = recursive; - } - - /** - * Get block size used to read input blocks of file - * - * @return blockSize - */ - public long getBlockSize() - { - return blockSize; - } - - /** - * Sets block size used to read input blocks of file - * - * @param blockSize - */ - public void setBlockSize(long blockSize) - { - this.blockSize = blockSize; - } - - /** - * Gets readers count - * @return readersCount - */ - public int getReadersCount() - { - return readersCount; - } - - /** - * Static count of readers to read input file - * @param readersCount - */ - public void setReadersCount(int readersCount) - { - this.readersCount = readersCount; - } - - /** - * Gets is sequencial file read - * - * @return sequencialFileRead - */ - public boolean isSequencialFileRead() - { - return sequencialFileRead; - } - - /** - * Sets is sequencial file read - * - * @param sequencialFileRead - */ - public void setSequencialFileRead(boolean sequencialFileRead) + @Override + public BlockReader createBlockReader() { - this.sequencialFileRead = sequencialFileRead; + return new BlockReader(); } - }