Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 4e0217b43 -> 31cf63770


APEXMALHAR-2018 Moved generic HDFS Input Moduel code to abstract class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/31cf6377
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/31cf6377
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/31cf6377

Branch: refs/heads/master
Commit: 31cf63770e8cb78380f942254e41a006bcba7cd6
Parents: 4e0217b
Author: Chaitanya <chaita...@datatorrent.com>
Authored: Mon Apr 25 17:38:55 2016 +0530
Committer: Chaitanya <chaita...@datatorrent.com>
Committed: Mon Apr 25 17:38:55 2016 +0530

----------------------------------------------------------------------
 .../datatorrent/lib/io/fs/FSFileSplitter.java   | 130 ++++++++++
 .../datatorrent/lib/io/fs/FSInputModule.java    | 242 +++++++++++++++++++
 .../datatorrent/lib/io/fs/HDFSFileSplitter.java |  91 +------
 .../datatorrent/lib/io/fs/HDFSInputModule.java  | 207 +---------------
 4 files changed, 382 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/31cf6377/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java
new file mode 100644
index 0000000..4318994
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.fs.Path;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.io.block.BlockMetadata;
+
+/**
+ * FSFileSplitter extends {@link FileSplitterInput} to,
+ * 1. Ignore files with extension "ignoreFilePatternRegularExp"
+ * 2. Set sequencial read option on readers.
+ */
+public class FSFileSplitter extends FileSplitterInput
+{
+  private boolean sequencialFileRead;
+
+  public FSFileSplitter()
+  {
+    super();
+    super.setScanner(new FSScanner());
+  }
+
+  @Override
+  protected BlockMetadata.FileBlockMetadata createBlockMetadata(FileMetadata 
fileMetadata)
+  {
+    BlockMetadata.FileBlockMetadata blockMetadta = new 
BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath());
+    blockMetadta.setReadBlockInSequence(sequencialFileRead);
+    return blockMetadta;
+  }
+
+  public boolean isSequencialFileRead()
+  {
+    return sequencialFileRead;
+  }
+
+  public void setSequencialFileRead(boolean sequencialFileRead)
+  {
+    this.sequencialFileRead = sequencialFileRead;
+  }
+
+  /**
+   * FSScanner extends {@link TimeBasedDirectoryScanner} to ignore temporary 
files
+   * and files containing unsupported characters.
+   */
+  public static class FSScanner extends TimeBasedDirectoryScanner
+  {
+    private String unsupportedCharacter;
+    private String ignoreFilePatternRegularExp;
+    private transient Pattern ignoreRegex;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      if (ignoreFilePatternRegularExp != null) {
+        ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp);
+      }
+    }
+
+    @Override
+    protected boolean acceptFile(String filePathStr)
+    {
+      boolean accepted = super.acceptFile(filePathStr);
+      if (containsUnsupportedCharacters(filePathStr) || 
isIgnoredFile(filePathStr)) {
+        return false;
+      }
+      return accepted;
+    }
+
+    private boolean isIgnoredFile(String filePathStr)
+    {
+      String fileName = new Path(filePathStr).getName();
+      if (ignoreRegex != null) {
+        Matcher matcher = ignoreRegex.matcher(fileName);
+        if (matcher.matches()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    private boolean containsUnsupportedCharacters(String filePathStr)
+    {
+      if (unsupportedCharacter != null) {
+        return new 
Path(filePathStr).toUri().getPath().contains(unsupportedCharacter);
+      }
+      return false;
+    }
+
+    public String getIgnoreFilePatternRegularExp()
+    {
+      return ignoreFilePatternRegularExp;
+    }
+
+    public void setIgnoreFilePatternRegularExp(String 
ignoreFilePatternRegularExp)
+    {
+      this.ignoreFilePatternRegularExp = ignoreFilePatternRegularExp;
+      this.ignoreRegex = Pattern.compile(ignoreFilePatternRegularExp);
+    }
+
+    public String getUnsupportedCharacter()
+    {
+      return unsupportedCharacter;
+    }
+
+    public void setUnsupportedCharacter(String unsupportedCharacter)
+    {
+      this.unsupportedCharacter = unsupportedCharacter;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/31cf6377/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
new file mode 100644
index 0000000..e5221a0
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.BlockReader;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * FSInputModule is an abstract class used to read files from file systems 
like HDFS, NFS, S3, etc. <br/>
+ * FSInputModule emits FileMetadata, BlockMetadata, BlockBytes. <br/>
+ * The module reads data in parallel, following parameters can be 
configured<br/>
+ * 1. files: list of file(s)/directories to read<br/>
+ * 2. filePatternRegularExp: Files names matching given regex will be read<br/>
+ * 3. scanIntervalMillis: interval between two scans to discover new files in 
input directory<br/>
+ * 4. recursive: if scan recursively input directories<br/>
+ * 5. blockSize: block size used to read input blocks of file<br/>
+ * 6. readersCount: count of readers to read input file<br/>
+ * 7. sequencialFileRead: If emit file blocks in sequence?
+ */
+
+public abstract class FSInputModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private long blockSize;
+  private boolean sequencialFileRead = false;
+  private int readersCount;
+
+  public final transient ProxyOutputPort<AbstractFileSplitter.FileMetadata> 
filesMetadataOutput = new ProxyOutputPort<>();
+  public final transient ProxyOutputPort<BlockMetadata.FileBlockMetadata> 
blocksMetadataOutput = new ProxyOutputPort<>();
+  public final transient 
ProxyOutputPort<AbstractBlockReader.ReaderRecord<Slice>> messages = new 
ProxyOutputPort<>();
+
+  public abstract FSFileSplitter createFileSplitter();
+
+  public abstract BlockReader createBlockReader();
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    FSFileSplitter fileSplitter = dag.addOperator("FileSplitter", 
createFileSplitter());
+    BlockReader blockReader = dag.addOperator("BlockReader", 
createBlockReader());
+
+    dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
blockReader.blocksMetadataInput);
+
+    filesMetadataOutput.set(fileSplitter.filesMetadataOutput);
+    blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
+    messages.set(blockReader.messages);
+
+    fileSplitter.setSequencialFileRead(sequencialFileRead);
+    if (blockSize != 0) {
+      fileSplitter.setBlockSize(blockSize);
+    }
+
+    FSFileSplitter.FSScanner fileScanner = 
(FSFileSplitter.FSScanner)fileSplitter.getScanner();
+    fileScanner.setFiles(files);
+    if (scanIntervalMillis != 0) {
+      fileScanner.setScanIntervalMillis(scanIntervalMillis);
+    }
+    fileScanner.setRecursive(recursive);
+    if (filePatternRegularExp != null) {
+      
fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
+    }
+
+    blockReader.setUri(files);
+    if (readersCount != 0) {
+      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<BlockReader>(readersCount));
+      fileSplitter.setBlocksThreshold(readersCount);
+    }
+  }
+
+  /**
+   * A comma separated list of directories to scan. If the path is not fully 
qualified the default file system is used.
+   * A fully qualified path can be provided to scan directories in other 
filesystems.
+   *
+   * @param files
+   *          files
+   */
+  public void setFiles(String files)
+  {
+    this.files = files;
+  }
+
+  /**
+   * Gets the files to be scanned.
+   *
+   * @return files to be scanned.
+   */
+  public String getFiles()
+  {
+    return files;
+  }
+
+  /**
+   * Gets the regular expression for file names to split
+   *
+   * @return regular expression
+   */
+  public String getFilePatternRegularExp()
+  {
+    return filePatternRegularExp;
+  }
+
+  /**
+   * Only files with names matching the given java regular expression are split
+   *
+   * @param filePatternRegexp
+   *          regular expression
+   */
+  public void setFilePatternRegularExp(String filePatternRegexp)
+  {
+    this.filePatternRegularExp = filePatternRegexp;
+  }
+
+  /**
+   * Gets scan interval in milliseconds, interval between two scans to 
discover new files in input directory
+   *
+   * @return scanInterval milliseconds
+   */
+  public long getScanIntervalMillis()
+  {
+    return scanIntervalMillis;
+  }
+
+  /**
+   * Sets scan interval in milliseconds, interval between two scans to 
discover new files in input directory
+   *
+   * @param scanIntervalMillis
+   */
+  public void setScanIntervalMillis(long scanIntervalMillis)
+  {
+    this.scanIntervalMillis = scanIntervalMillis;
+  }
+
+  /**
+   * Get is scan recursive
+   *
+   * @return isRecursive
+   */
+  public boolean isRecursive()
+  {
+    return recursive;
+  }
+
+  /**
+   * set is scan recursive
+   *
+   * @param recursive
+   */
+  public void setRecursive(boolean recursive)
+  {
+    this.recursive = recursive;
+  }
+
+  /**
+   * Get block size used to read input blocks of file
+   *
+   * @return blockSize
+   */
+  public long getBlockSize()
+  {
+    return blockSize;
+  }
+
+  /**
+   * Sets block size used to read input blocks of file
+   *
+   * @param blockSize
+   */
+  public void setBlockSize(long blockSize)
+  {
+    this.blockSize = blockSize;
+  }
+
+  /**
+   * Gets readers count
+   * @return readersCount
+   */
+  public int getReadersCount()
+  {
+    return readersCount;
+  }
+
+  /**
+   * Static count of readers to read input file
+   * @param readersCount
+   */
+  public void setReadersCount(int readersCount)
+  {
+    this.readersCount = readersCount;
+  }
+
+  /**
+   * Gets is sequencial file read
+   *
+   * @return sequencialFileRead
+   */
+  public boolean isSequencialFileRead()
+  {
+    return sequencialFileRead;
+  }
+
+  /**
+   * Sets is sequencial file read
+   *
+   * @param sequencialFileRead
+   */
+  public void setSequencialFileRead(boolean sequencialFileRead)
+  {
+    this.sequencialFileRead = sequencialFileRead;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/31cf6377/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
index 24466d5..26f2b6d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
@@ -18,103 +18,20 @@
  */
 package com.datatorrent.lib.io.fs;
 
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
-
 /**
  * HDFSFileSplitter extends {@link FileSplitterInput} to,
  * 1. Add relative path to file metadata.
  * 2. Ignore HDFS temp files (files with extensions _COPYING_).
  * 3. Set sequencial read option on readers.
  */
-public class HDFSFileSplitter extends FileSplitterInput
+public class HDFSFileSplitter extends FSFileSplitter
 {
-  private boolean sequencialFileRead;
-
   public HDFSFileSplitter()
   {
     super();
-    super.setScanner(new HDFSScanner());
-  }
-
-
-  @Override
-  protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
-  {
-    FileBlockMetadata blockMetadta = new 
FileBlockMetadata(fileMetadata.getFilePath());
-    blockMetadta.setReadBlockInSequence(sequencialFileRead);
-    return blockMetadta;
-  }
-
-  public boolean isSequencialFileRead()
-  {
-    return sequencialFileRead;
-  }
-
-  public void setSequencialFileRead(boolean sequencialFileRead)
-  {
-    this.sequencialFileRead = sequencialFileRead;
-  }
-
-  /**
-   * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS 
temporary files
-   * and files containing unsupported characters. 
-   */
-  public static class HDFSScanner extends TimeBasedDirectoryScanner
-  {
-    protected static final String UNSUPPORTED_CHARACTOR = ":";
-    private String ignoreFilePatternRegularExp = ".*._COPYING_";
-    private transient Pattern ignoreRegex;
-
-    @Override
-    public void setup(OperatorContext context)
-    {
-      super.setup(context);
-      ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp);
-    }
-
-    @Override
-    protected boolean acceptFile(String filePathStr)
-    {
-      boolean accepted = super.acceptFile(filePathStr);
-      if (containsUnsupportedCharacters(filePathStr) || 
isIgnoredFile(filePathStr)) {
-        return false;
-      }
-      return accepted;
-    }
-
-    private boolean isIgnoredFile(String filePathStr)
-    {
-      String fileName = new Path(filePathStr).getName();
-      if (ignoreRegex != null) {
-        Matcher matcher = ignoreRegex.matcher(fileName);
-        if (matcher.matches()) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    private boolean containsUnsupportedCharacters(String filePathStr)
-    {
-      return new 
Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
-    }
-
-    public String getIgnoreFilePatternRegularExp()
-    {
-      return ignoreFilePatternRegularExp;
-    }
-
-    public void setIgnoreFilePatternRegularExp(String 
ignoreFilePatternRegularExp)
-    {
-      this.ignoreFilePatternRegularExp = ignoreFilePatternRegularExp;
-      this.ignoreRegex = Pattern.compile(ignoreFilePatternRegularExp);
-    }
+    FSFileSplitter.FSScanner scanner = (FSFileSplitter.FSScanner)getScanner();
+    scanner.setIgnoreFilePatternRegularExp(".*._COPYING_");
+    scanner.setUnsupportedCharacter(":");
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/31cf6377/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
index 4456352..de99fd3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
@@ -18,22 +18,7 @@
  */
 package com.datatorrent.lib.io.fs;
 
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
-import javax.validation.constraints.Size;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Module;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
-import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
 import com.datatorrent.lib.io.block.BlockReader;
-import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
-import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * HDFSInputModule is used to read files/list of files (or directory) from 
HDFS. <br/>
@@ -48,197 +33,17 @@ import com.datatorrent.netlet.util.Slice;
  * 6. readersCount: count of readers to read input file<br/>
  * 7. sequencialFileRead: If emit file blocks in sequence?
  */
-public class HDFSInputModule implements Module
+public class HDFSInputModule extends FSInputModule
 {
-
-  @NotNull
-  @Size(min = 1)
-  private String files;
-  private String filePatternRegularExp;
-  @Min(0)
-  private long scanIntervalMillis;
-  private boolean recursive = true;
-  private long blockSize;
-  private boolean sequencialFileRead = false;
-  private int readersCount;
-
-  public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = 
new ProxyOutputPort<>();
-  public final transient ProxyOutputPort<FileBlockMetadata> 
blocksMetadataOutput = new ProxyOutputPort<>();
-  public final transient ProxyOutputPort<ReaderRecord<Slice>> messages = new 
ProxyOutputPort<>();
-
   @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    HDFSFileSplitter fileSplitter = dag.addOperator("FileSplitter", new 
HDFSFileSplitter());
-    BlockReader blockReader = dag.addOperator("BlockReader", new 
BlockReader());
-
-    dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
blockReader.blocksMetadataInput);
-
-    filesMetadataOutput.set(fileSplitter.filesMetadataOutput);
-    blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
-    messages.set(blockReader.messages);
-
-    fileSplitter.setSequencialFileRead(sequencialFileRead);
-    if (blockSize != 0) {
-      fileSplitter.setBlockSize(blockSize);
-    }
-
-    HDFSScanner fileScanner = (HDFSScanner)fileSplitter.getScanner();
-    fileScanner.setFiles(files);
-    if (scanIntervalMillis != 0) {
-      fileScanner.setScanIntervalMillis(scanIntervalMillis);
-    }
-    fileScanner.setRecursive(recursive);
-    if (filePatternRegularExp != null) {
-      
fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
-    }
-
-    blockReader.setUri(files);
-    if (readersCount != 0) {
-      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<BlockReader>(readersCount));
-      fileSplitter.setBlocksThreshold(readersCount);
-    }
-  }
-
-  /**
-   * A comma separated list of directories to scan. If the path is not fully 
qualified the default file system is used.
-   * A fully qualified path can be provided to scan directories in other 
filesystems.
-   *
-   * @param files
-   *          files
-   */
-  public void setFiles(String files)
+  public FSFileSplitter createFileSplitter()
   {
-    this.files = files;
+    return new HDFSFileSplitter();
   }
 
-  /**
-   * Gets the files to be scanned.
-   *
-   * @return files to be scanned.
-   */
-  public String getFiles()
-  {
-    return files;
-  }
-
-  /**
-   * Gets the regular expression for file names to split
-   *
-   * @return regular expression
-   */
-  public String getFilePatternRegularExp()
-  {
-    return filePatternRegularExp;
-  }
-
-  /**
-   * Only files with names matching the given java regular expression are split
-   *
-   * @param filePatternRegexp
-   *          regular expression
-   */
-  public void setFilePatternRegularExp(String filePatternRegexp)
-  {
-    this.filePatternRegularExp = filePatternRegexp;
-  }
-
-  /**
-   * Gets scan interval in milliseconds, interval between two scans to 
discover new files in input directory
-   *
-   * @return scanInterval milliseconds
-   */
-  public long getScanIntervalMillis()
-  {
-    return scanIntervalMillis;
-  }
-
-  /**
-   * Sets scan interval in milliseconds, interval between two scans to 
discover new files in input directory
-   *
-   * @param scanIntervalMillis
-   */
-  public void setScanIntervalMillis(long scanIntervalMillis)
-  {
-    this.scanIntervalMillis = scanIntervalMillis;
-  }
-
-  /**
-   * Get is scan recursive
-   *
-   * @return isRecursive
-   */
-  public boolean isRecursive()
-  {
-    return recursive;
-  }
-
-  /**
-   * set is scan recursive
-   *
-   * @param recursive
-   */
-  public void setRecursive(boolean recursive)
-  {
-    this.recursive = recursive;
-  }
-
-  /**
-   * Get block size used to read input blocks of file
-   *
-   * @return blockSize
-   */
-  public long getBlockSize()
-  {
-    return blockSize;
-  }
-
-  /**
-   * Sets block size used to read input blocks of file
-   *
-   * @param blockSize
-   */
-  public void setBlockSize(long blockSize)
-  {
-    this.blockSize = blockSize;
-  }
-
-  /**
-   * Gets readers count
-   * @return readersCount
-   */
-  public int getReadersCount()
-  {
-    return readersCount;
-  }
-
-  /**
-   * Static count of readers to read input file
-   * @param readersCount
-   */
-  public void setReadersCount(int readersCount)
-  {
-    this.readersCount = readersCount;
-  }
-
-  /**
-   * Gets is sequencial file read
-   * 
-   * @return sequencialFileRead
-   */
-  public boolean isSequencialFileRead()
-  {
-    return sequencialFileRead;
-  }
-
-  /**
-   * Sets is sequencial file read
-   *
-   * @param sequencialFileRead
-   */
-  public void setSequencialFileRead(boolean sequencialFileRead)
+  @Override
+  public BlockReader createBlockReader()
   {
-    this.sequencialFileRead = sequencialFileRead;
+    return new BlockReader();
   }
-
 }

Reply via email to