Repository: apex-malhar Updated Branches: refs/heads/master c92ca15e8 -> 37991576d
APEXMALHAR-2237 Changes in FSInputModule to support Dynamic partitioning. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/37991576 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/37991576 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/37991576 Branch: refs/heads/master Commit: 37991576d826c2ac30a077b0774a3fbada3c1f9c Parents: c92ca15 Author: yogidevendra <[email protected]> Authored: Wed Sep 21 12:29:06 2016 +0530 Committer: yogidevendra <[email protected]> Committed: Mon Oct 24 14:09:41 2016 +0530 ---------------------------------------------------------------------- apps/filecopy/src/site/conf/app-conf.xml | 22 +---- .../lib/io/block/AbstractBlockReader.java | 4 +- .../datatorrent/lib/io/fs/FSInputModule.java | 98 +++++++++++++++----- 3 files changed, 80 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37991576/apps/filecopy/src/site/conf/app-conf.xml ---------------------------------------------------------------------- diff --git a/apps/filecopy/src/site/conf/app-conf.xml b/apps/filecopy/src/site/conf/app-conf.xml index 964ad7d..6f995bf 100644 --- a/apps/filecopy/src/site/conf/app-conf.xml +++ b/apps/filecopy/src/site/conf/app-conf.xml @@ -23,26 +23,14 @@ <configuration> <property> <name>dt.operator.HDFSInputModule.prop.files</name> - <value>hdfs://localhost:54310/user/dtadmin/input</value> - </property> - <property> - <name>dt.operator.HDFSInputModule.prop.blockSize</name> - <value>128000000</value> - </property> - <property> - <name>dt.operator.HDFSInputModule.prop.scanIntervalMillis</name> - <value>10000</value> - </property> - <property> - <name>dt.operator.HDFSInputModule.prop.dedup</name> - <value>true</value> + <value>hdfs://localhost:54310/user/appuser/input</value> </property> <property> <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name> - <value>hdfs://localhost:54310/user/dtadmin/output</value> + <value>hdfs://localhost:54310/user/appuser/output</value> </property> <property> - <name>dt.loggers.level</name> - <value>com.datatorrent.*:DEBUG,org.apache.*:INFO</value> -</property> + <name>dt.operator.HDFSInputModule.prop.blocksThreshold</name> + <value>1</value> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37991576/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java index 268a17b..734df38 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java @@ -115,7 +115,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext */ protected int minReaders; /** - * Interval at which stats are processed. Default : 1 minute + * Interval at which stats are processed. Default : 2 minutes */ protected long intervalMillis; @@ -147,7 +147,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext { maxReaders = 16; minReaders = 1; - intervalMillis = 60 * 1000L; + intervalMillis = 2 * 60 * 1000L; response = new StatsListener.Response(); backlogPerOperator = Maps.newHashMap(); partitionCount = 1; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37991576/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java index d71111c..6d54756 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.Module; -import com.datatorrent.common.partitioner.StatelessPartitioner; import com.datatorrent.lib.codec.KryoSerializableStreamCodec; import com.datatorrent.lib.io.block.AbstractBlockReader; import com.datatorrent.lib.io.block.BlockMetadata; @@ -41,9 +40,11 @@ import com.datatorrent.netlet.util.Slice; * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/> * 4. recursive: if scan recursively input directories<br/> * 5. blockSize: block size used to read input blocks of file<br/> - * 6. readersCount: count of readers to read input file<br/> - * 7. sequentialFileRead: If emit file blocks in sequence?<br/> - * 8. blocksThreshold: number of blocks emitted per window + * 6. sequentialFileRead: If emit file blocks in sequence?<br/> + * 7. blocksThreshold: number of blocks emitted per window + * 8. minReaders: Minimum number of block readers for dynamic partitioning + * 9. maxReaders: Maximum number of block readers for dynamic partitioning + * 10. repartitionCheckInterval: Interval for re-evaluating dynamic partitioning * * @since 3.5.0 */ @@ -59,9 +60,11 @@ public class FSInputModule implements Module private boolean recursive = true; private long blockSize; private boolean sequentialFileRead = false; - private int readersCount; @Min(1) protected int blocksThreshold; + protected int minReaders; + protected int maxReaders; + protected long repartitionCheckInterval; public final transient ProxyOutputPort<AbstractFileSplitter.FileMetadata> filesMetadataOutput = new ProxyOutputPort<>(); public final transient ProxyOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>(); @@ -108,8 +111,17 @@ public class FSInputModule implements Module } blockReader.setBasePath(files); - if (readersCount != 0) { - dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<FSSliceReader>(readersCount)); + + if (minReaders != 0) { + blockReader.setMinReaders(minReaders); + } + + if (maxReaders != 0) { + blockReader.setMaxReaders(maxReaders); + } + + if (repartitionCheckInterval != 0) { + blockReader.setIntervalMillis(repartitionCheckInterval); } fileSplitter.setBlocksThreshold(blocksThreshold); } @@ -218,24 +230,6 @@ public class FSInputModule implements Module } /** - * Gets readers count - * @return readersCount - */ - public int getReadersCount() - { - return readersCount; - } - - /** - * Static count of readers to read input file - * @param readersCount - */ - public void setReadersCount(int readersCount) - { - this.readersCount = readersCount; - } - - /** * Gets is sequential file read * * @return sequentialFileRead @@ -277,6 +271,60 @@ public class FSInputModule implements Module return blocksThreshold; } + /** + * Gets minimum number of block readers for dynamic partitioning. + * @return minimum instances of block reader. + */ + public int getMinReaders() + { + return minReaders; + } + + /** + * Sets minimum number of block readers for dynamic partitioning. + * @param minReaders minimum number of readers. + */ + public void setMinReaders(int minReaders) + { + this.minReaders = minReaders; + } + + /** + * Gets maximum number of block readers for dynamic partitioning. + * @return maximum instances of block reader. + */ + public int getMaxReaders() + { + return maxReaders; + } + + /** + * Sets maximum number of block readers for dynamic partitioning. + * @param maxReaders maximum number of readers. + */ + public void setMaxReaders(int maxReaders) + { + this.maxReaders = maxReaders; + } + + /** + * Gets Interval for re-evaluating dynamic partitioning + * @return interval for re-evaluating dynamic partitioning + */ + public long getRepartitionCheckInterval() + { + return repartitionCheckInterval; + } + + /** + * Sets Interval for re-evaluating dynamic partitioning + * @param repartitionCheckInterval interval for re-evaluating dynamic partitioning + */ + public void setRepartitionCheckInterval(long repartitionCheckInterval) + { + this.repartitionCheckInterval = repartitionCheckInterval; + } + public static class SequentialFileBlockMetadataCodec extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata> {
