Repository: apex-malhar
Updated Branches:
  refs/heads/master 16edf3067 -> b43818bce


APEXMALHAR-2302 Exposing the few properties of FSSplitter and BlockReader 
operators to FSRecordReaderModule to tune Application.

  1) Expose blockSize property of FileSplitter operator.
  2) Expose minReaders and maxReaders for dynamic partitioning of Block Reader 
operator.
  3) Deprecate readersCount <PARTITONER> from FSRecordReaderModule.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b43818bc
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b43818bc
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b43818bc

Branch: refs/heads/master
Commit: b43818bce0da0ee94458b983061c726724c3bf14
Parents: 16edf30
Author: deepak-narkhede <[email protected]>
Authored: Fri Oct 21 17:57:01 2016 +0530
Committer: deepak-narkhede <[email protected]>
Committed: Wed Nov 2 14:46:12 2016 +0530

----------------------------------------------------------------------
 .../malhar/lib/fs/FSRecordReaderModule.java     | 147 ++++++++++++++-----
 1 file changed, 111 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b43818bc/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
index d508320..2e29cb7 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
@@ -30,10 +30,8 @@ import org.apache.hadoop.conf.Configuration;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Module;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
 import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
 import com.datatorrent.lib.io.block.BlockMetadata;
-import com.datatorrent.lib.io.block.FSSliceReader;
 import com.datatorrent.lib.io.fs.FileSplitterInput;
 
 /**
@@ -52,13 +50,14 @@ import com.datatorrent.lib.io.fs.FileSplitterInput;
  * input directory<br/>
  * 4. recursive: if true, scan input directories recursively<br/>
  * 5. blockSize: block size used to read input blocks of file<br/>
- * 6. readersCount: count of readers to read input file<br/>
- * 7. sequentialFileRead: if true, then each reader partition will read
+ * 6. sequentialFileRead: if true, then each reader partition will read
  * different file. <br/>
  * instead of reading different offsets of the same file. <br/>
  * (File level parallelism instead of block level parallelism)<br/>
- * 8. blocksThreshold: number of blocks emitted per window
- *
+ * 7. blocksThreshold: number of blocks emitted per window<br/>
+ * 8. minReaders: Minimum number of block readers for dynamic partitioning<br/>
+ * 9. maxReaders: Maximum number of block readers for dynamic partitioning<br/>
+ * 10. repartitionCheckInterval: Interval for re-evaluating dynamic 
partitioning<br/>
  * @since 3.5.0
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
@@ -72,10 +71,13 @@ public class FSRecordReaderModule implements Module
   private long scanIntervalMillis = 5000;
   private boolean recursive = true;
   private boolean sequentialFileRead = false;
-  @Min(1)
-  private int readersCount = 1;
+  @Min(0)
+  private long blockSize;
   @Min(1)
   protected int blocksThreshold = 1;
+  protected int minReaders;
+  protected int maxReaders;
+  protected long repartitionCheckInterval;
 
   public final transient ProxyOutputPort<byte[]> records = new 
ProxyOutputPort<byte[]>();
 
@@ -135,13 +137,26 @@ public class FSRecordReaderModule implements Module
     if (filePatternRegularExp != null) {
       
fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
     }
-
     recordReader.setBasePath(files);
-    if (readersCount != 0) {
-      dag.setAttribute(recordReader, Context.OperatorContext.PARTITIONER,
-          new StatelessPartitioner<FSSliceReader>(readersCount));
-    }
     fileSplitter.setBlocksThreshold(blocksThreshold);
+
+    if (minReaders != 0) {
+      recordReader.setMinReaders(minReaders);
+    }
+    if (maxReaders != 0) {
+      recordReader.setMaxReaders(maxReaders);
+    }
+    if (repartitionCheckInterval != 0) {
+      recordReader.setIntervalMillis(repartitionCheckInterval);
+    }
+
+    /**
+     * Override the split size or input blocks of a file. If not specified,
+     * it would use default blockSize of the filesystem.
+     */
+    if (blockSize != 0) {
+      fileSplitter.setBlockSize(blockSize);
+    }
     records.set(recordReader.records);
   }
 
@@ -232,26 +247,6 @@ public class FSRecordReaderModule implements Module
   }
 
   /**
-   * Gets readers count
-   *
-   * @return readersCount
-   */
-  public int getReadersCount()
-  {
-    return readersCount;
-  }
-
-  /**
-   * Static count of readers to read input file
-   *
-   * @param readersCount
-   */
-  public void setReadersCount(int readersCount)
-  {
-    this.readersCount = readersCount;
-  }
-
-  /**
    * Gets is sequential file read
    *
    * @return sequentialFileRead
@@ -275,7 +270,7 @@ public class FSRecordReaderModule implements Module
   /**
    * Sets number of blocks to be emitted per window.<br/>
    * A lot of blocks emitted per window can overwhelm the downstream operators.
-   * Set this value considering blockSize and readersCount.
+   * Set this value considering blockSize, minReaders and maxReaders.
    *
    * @param threshold
    */
@@ -287,9 +282,9 @@ public class FSRecordReaderModule implements Module
   /**
    * Gets number of blocks to be emitted per window.<br/>
    * A lot of blocks emitted per window can overwhelm the downstream operators.
-   * Set this value considering blockSize and readersCount.
+   * Set this value considering blockSize, minReaders and maxReaders.
    *
-   * @return
+   * @return blocksThreshold
    */
   public int getBlocksThreshold()
   {
@@ -297,6 +292,86 @@ public class FSRecordReaderModule implements Module
   }
 
   /**
+   * Gets block size used to read input blocks of file
+   *
+   * @return blockSize
+   */
+  public long getBlockSize()
+  {
+    return blockSize;
+  }
+
+  /**
+   * Sets block size used to read input blocks of file
+   *
+   * @param blockSize
+   */
+  public void setBlockSize(long blockSize)
+  {
+    this.blockSize = blockSize;
+  }
+
+  /**
+   * Gets minimum number of block readers for dynamic partitioning.
+   *
+   * @return minimum instances of block reader.
+   */
+  public int getMinReaders()
+  {
+    return minReaders;
+  }
+
+  /**
+   * Sets minimum number of block readers for dynamic partitioning.
+   *
+   * @param minReaders minimum number of readers.
+   */
+  public void setMinReaders(int minReaders)
+  {
+    this.minReaders = minReaders;
+  }
+
+  /**
+   * Gets maximum number of block readers for dynamic partitioning.
+   *
+   * @return maximum instances of block reader.
+   */
+  public int getMaxReaders()
+  {
+    return maxReaders;
+  }
+
+  /**
+   * Sets maximum number of block readers for dynamic partitioning.
+   *
+   * @param maxReaders maximum number of readers.
+   */
+  public void setMaxReaders(int maxReaders)
+  {
+    this.maxReaders = maxReaders;
+  }
+
+  /**
+   * Gets Interval for re-evaluating dynamic partitioning.
+   *
+   * @return interval for re-evaluating dynamic partitioning
+   */
+  public long getRepartitionCheckInterval()
+  {
+    return repartitionCheckInterval;
+  }
+
+  /**
+   * Sets Interval for re-evaluating dynamic partitioning.
+   *
+   * @param repartitionCheckInterval interval for re-evaluating dynamic 
partitioning
+   */
+  public void setRepartitionCheckInterval(long repartitionCheckInterval)
+  {
+    this.repartitionCheckInterval = repartitionCheckInterval;
+  }
+
+  /**
    * Criteria for record split
    *
    * @return mode

Reply via email to