Repository: incubator-apex-malhar Updated Branches: refs/heads/master a14b48716 -> 127ffe752
APEXMALHAR-2096: Add property blocksThreshold to limit input rate Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/703ff065 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/703ff065 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/703ff065 Branch: refs/heads/master Commit: 703ff065a1cafc2de0120bb12818e4920ce86f27 Parents: a14b487 Author: Priyanka Gugale <[email protected]> Authored: Wed May 18 17:52:08 2016 -0700 Committer: Priyanka Gugale <[email protected]> Committed: Tue May 31 17:35:01 2016 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileSplitter.java | 17 +++++++---- .../datatorrent/lib/io/fs/FSInputModule.java | 31 +++++++++++++++++--- .../lib/io/fs/FSInputModuleAppTest.java | 1 + .../lib/io/fs/FileSplitterBaseTest.java | 5 +++- .../lib/io/fs/FileSplitterInputTest.java | 1 + 5 files changed, 45 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java index 2ea961e..7e6bd2f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java @@ -71,11 +71,6 @@ public abstract class AbstractFileSplitter extends BaseOperator public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new DefaultOutputPort<>(); - public AbstractFileSplitter() - { - blocksThreshold = Integer.MAX_VALUE; - } - @Override public void setup(Context.OperatorContext context) { @@ -280,11 +275,23 @@ public abstract class AbstractFileSplitter extends BaseOperator return blockSize; } + /** + * Sets number of blocks to be emitted per window.<br/> + * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and + * readersCount. + * @param threshold + */ public void setBlocksThreshold(int threshold) { this.blocksThreshold = threshold; } + /** + * Gets number of blocks to be emitted per window.<br/> + * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and + * readersCount. + * @return + */ public int getBlocksThreshold() { return blocksThreshold; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java index 713e745..e8af9aa 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java @@ -42,9 +42,8 @@ import com.datatorrent.netlet.util.Slice; * 4. recursive: if scan recursively input directories<br/> * 5. blockSize: block size used to read input blocks of file<br/> * 6. readersCount: count of readers to read input file<br/> - * 7. sequencialFileRead: If emit file blocks in sequence? - * - * @since 3.4.0 + * 7. sequencialFileRead: If emit file blocks in sequence?<br/> + * 8. blocksThreshold: number of blocks emitted per window */ public class FSInputModule implements Module @@ -59,6 +58,8 @@ public class FSInputModule implements Module private long blockSize; private boolean sequencialFileRead = false; private int readersCount; + @Min(1) + protected int blocksThreshold; public final transient ProxyOutputPort<AbstractFileSplitter.FileMetadata> filesMetadataOutput = new ProxyOutputPort<>(); public final transient ProxyOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>(); @@ -107,8 +108,8 @@ public class FSInputModule implements Module blockReader.setBasePath(files); if (readersCount != 0) { dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<FSSliceReader>(readersCount)); - fileSplitter.setBlocksThreshold(readersCount); } + fileSplitter.setBlocksThreshold(blocksThreshold); } /** @@ -252,6 +253,28 @@ public class FSInputModule implements Module this.sequencialFileRead = sequencialFileRead; } + /** + * Sets number of blocks to be emitted per window.<br/> + * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and + * readersCount. + * @param threshold + */ + public void setBlocksThreshold(int threshold) + { + this.blocksThreshold = threshold; + } + + /** + * Gets number of blocks to be emitted per window.<br/> + * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and + * readersCount. + * @return + */ + public int getBlocksThreshold() + { + return blocksThreshold; + } + public static class SequentialFileBlockMetadataCodec extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata> { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java index 19ab84f..4213a00 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java @@ -97,6 +97,7 @@ public class FSInputModuleAppTest Configuration conf = new Configuration(false); conf.set("dt.operator.hdfsInputModule.prop.files", inputDir); conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10"); + conf.set("dt.operator.hdfsInputModule.prop.blocksThreshold", "4"); conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000"); LocalMode lma = LocalMode.newInstance(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java index ac8c7e2..862e589 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java @@ -80,6 +80,7 @@ public class FileSplitterBaseTest } fileSplitter = new FileSplitterBase(); + fileSplitter.setBlocksThreshold(100); fileSplitter.setFile(this.dataDirectory); Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); @@ -199,7 +200,9 @@ public class FileSplitterBaseTest { LocalMode lma = LocalMode.newInstance(); SplitterApp app = new SplitterApp(); - lma.prepareDAG(app, new Configuration()); + Configuration appConf = new Configuration(); + appConf.set("dt.operator.Splitter.prop.blocksThreshold", "4"); + lma.prepareDAG(app, appConf); lma.cloneDAG(); // check serialization LocalMode.Controller lc = lma.getController(); lc.runAsync(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java index faa1d45..cea5109 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java @@ -104,6 +104,7 @@ public class FileSplitterInputTest } fileSplitterInput = new FileSplitterInput(); + fileSplitterInput.setBlocksThreshold(100); scanner = new MockScanner(); fileSplitterInput.setScanner(scanner); fileSplitterInput.getScanner().setScanIntervalMillis(500);
