Repository: apex-malhar
Updated Branches:
  refs/heads/master 8bfaf7896 -> 2e2dfc5c0


APEXMALHAR-2116 : Added FS record reader operator, module, test

2. javadoc improvements.

3. Adding default values

4. Incorporating review comments.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/02d657c2
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/02d657c2
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/02d657c2

Branch: refs/heads/master
Commit: 02d657c215bb5f1c285e9bf2a1e67fdd285eb662
Parents: aaa4464
Author: yogidevendra <[email protected]>
Authored: Mon Jun 20 11:17:08 2016 +0530
Committer: yogidevendra <[email protected]>
Committed: Mon Jul 25 11:52:45 2016 +0530

----------------------------------------------------------------------
 .../apex/malhar/lib/fs/FSRecordReader.java      | 149 ++++++++
 .../malhar/lib/fs/FSRecordReaderModule.java     | 347 +++++++++++++++++++
 .../apex/malhar/lib/fs/FSRecordReaderTest.java  | 217 ++++++++++++
 3 files changed, 713 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02d657c2/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java 
b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
new file mode 100644
index 0000000..44168c8
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem in
+ * parallel (without ordering guarantees between tuples). Records can be
+ * delimited (e.g. newline) or fixed width records. Output tuples are byte[].
+ * 
+ * Typically, this operator will be connected to output of FileSplitterInput to
+ * read records in parallel.
+ */
[email protected]
+public class FSRecordReader extends FSSliceReader
+{
+  /**
+   * Record reader mode decides how to split the records.
+   */
+  public static enum RECORD_READER_MODE
+  {
+    DELIMITED_RECORD, FIXED_WIDTH_RECORD;
+  }
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+
+  /**
+   * Port to emit individual records/tuples as byte[]
+   */
+  public final transient DefaultOutputPort<byte[]> records = new 
DefaultOutputPort<byte[]>();
+
+  /**
+   * Initialize appropriate reader context based on mode selection
+   */
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) {
+      ReaderContext.FixedBytesReaderContext<FSDataInputStream> 
fixedBytesReaderContext = new 
ReaderContext.FixedBytesReaderContext<FSDataInputStream>();
+      fixedBytesReaderContext.setLength(recordLength);
+      readerContext = fixedBytesReaderContext;
+    } else {
+      readerContext = new 
ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>();
+    }
+  }
+
+  /**
+   * Read the block data and emit records based on reader context
+   *
+   * @param blockMetadata
+   *          block
+   * @throws IOException
+   */
+  protected void readBlock(BlockMetadata blockMetadata) throws IOException
+  {
+    readerContext.initialize(stream, blockMetadata, consecutiveBlock);
+    ReaderContext.Entity entity;
+    while ((entity = readerContext.next()) != null) {
+
+      counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
+
+      byte[] record = entity.getRecord();
+
+      if (record != null) {
+        counters.getCounter(ReaderCounterKeys.RECORDS).increment();
+        records.emit(record);
+      }
+    }
+  }
+
+  /**
+   * Criteria for record split
+   * 
+   * @param mode
+   *          Mode
+   */
+  public void setMode(RECORD_READER_MODE mode)
+  {
+    this.mode = mode;
+  }
+
+  /**
+   * Criteria for record split
+   * 
+   * @return mode
+   */
+  public RECORD_READER_MODE getMode()
+  {
+    return mode;
+  }
+
+  /**
+   * Length for fixed width record
+   * 
+   * @param recordLength
+   */
+  public void setRecordLength(int recordLength)
+  {
+    if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD && recordLength <= 0) {
+      throw new IllegalArgumentException("recordLength should be greater than 
0.");
+    }
+    this.recordLength = recordLength;
+  }
+
+  /**
+   * Length for fixed width record
+   * 
+   * @return record length
+   */
+  public int getRecordLength()
+  {
+    return recordLength;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02d657c2/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
new file mode 100644
index 0000000..b1df744
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be configured
+ * <br/>
+ * 1. files: list of file(s)/directories to read<br/>
+ * 2. filePatternRegularExp: Files with names matching given regex will be read
+ * <br/>
+ * 3. scanIntervalMillis: interval between two scans to discover new files in
+ * input directory<br/>
+ * 4. recursive: if true, scan input directories recursively<br/>
+ * 5. blockSize: block size used to read input blocks of file<br/>
+ * 6. readersCount: count of readers to read input file<br/>
+ * 7. sequentialFileRead: if true, then each reader partition will read
+ * different file. <br/>
+ * instead of reading different offsets of the same file. <br/>
+ * (File level parallelism instead of block level parallelism)<br/>
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
[email protected]
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(1)
+  private long scanIntervalMillis = 5000;
+  private boolean recursive = true;
+  private boolean sequentialFileRead = false;
+  @Min(1)
+  private int readersCount = 1;
+  @Min(1)
+  protected int blocksThreshold = 1;
+
+  public final transient ProxyOutputPort<byte[]> records = new 
ProxyOutputPort<byte[]>();
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+
+  /**
+   * Creates an instance of FileSplitter
+   * 
+   * @return
+   */
+  public FileSplitterInput createFileSplitter()
+  {
+    return new FileSplitterInput();
+  }
+
+  /**
+   * Creates an instance of Record Reader
+   * 
+   * @return FSRecordReader instance
+   */
+  public FSRecordReader createRecordReader()
+  {
+    FSRecordReader recordReader = new FSRecordReader();
+    recordReader.setMode(mode);
+    recordReader.setRecordLength(recordLength);
+
+    return recordReader;
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", 
createFileSplitter());
+    FSRecordReader recordReader = dag.addOperator("BlockReader", 
createRecordReader());
+
+    dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
recordReader.blocksMetadataInput);
+
+    if (sequentialFileRead) {
+      dag.setInputPortAttribute(recordReader.blocksMetadataInput, 
Context.PortContext.STREAM_CODEC,
+          new SequentialFileBlockMetadataCodec());
+    }
+
+    FileSplitterInput.TimeBasedDirectoryScanner fileScanner = 
fileSplitter.getScanner();
+    fileScanner.setFiles(files);
+    if (scanIntervalMillis != 0) {
+      fileScanner.setScanIntervalMillis(scanIntervalMillis);
+    }
+    fileScanner.setRecursive(recursive);
+    if (filePatternRegularExp != null) {
+      
fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
+    }
+
+    recordReader.setBasePath(files);
+    if (readersCount != 0) {
+      dag.setAttribute(recordReader, Context.OperatorContext.PARTITIONER,
+          new StatelessPartitioner<FSSliceReader>(readersCount));
+    }
+    fileSplitter.setBlocksThreshold(blocksThreshold);
+    records.set(recordReader.records);
+  }
+
+  /**
+   * A comma separated list of directories to scan. If the path is not fully
+   * qualified the default file system is used. A fully qualified path can be
+   * provided to scan directories in other filesystems.
+   *
+   * @param files
+   *          files
+   */
+  public void setFiles(String files)
+  {
+    this.files = files;
+  }
+
+  /**
+   * Gets the files to be scanned.
+   *
+   * @return files to be scanned.
+   */
+  public String getFiles()
+  {
+    return files;
+  }
+
+  /**
+   * Gets the regular expression for file names to split
+   *
+   * @return regular expression
+   */
+  public String getFilePatternRegularExp()
+  {
+    return filePatternRegularExp;
+  }
+
+  /**
+   * Only files width names matching the given java regular expression are 
split
+   *
+   * @param filePatternRegexp
+   *          regular expression
+   */
+  public void setFilePatternRegularExp(String filePatternRegexp)
+  {
+    this.filePatternRegularExp = filePatternRegexp;
+  }
+
+  /**
+   * Gets scan interval in milliseconds, interval between two scans to discover
+   * new files in input directory
+   *
+   * @return scanInterval milliseconds
+   */
+  public long getScanIntervalMillis()
+  {
+    return scanIntervalMillis;
+  }
+
+  /**
+   * Sets scan interval in milliseconds, interval between two scans to discover
+   * new files in input directory
+   *
+   * @param scanIntervalMillis
+   */
+  public void setScanIntervalMillis(long scanIntervalMillis)
+  {
+    this.scanIntervalMillis = scanIntervalMillis;
+  }
+
+  /**
+   * Get is scan recursive
+   *
+   * @return isRecursive
+   */
+  public boolean isRecursive()
+  {
+    return recursive;
+  }
+
+  /**
+   * set is scan recursive
+   *
+   * @param recursive
+   */
+  public void setRecursive(boolean recursive)
+  {
+    this.recursive = recursive;
+  }
+
+  /**
+   * Gets readers count
+   * 
+   * @return readersCount
+   */
+  public int getReadersCount()
+  {
+    return readersCount;
+  }
+
+  /**
+   * Static count of readers to read input file
+   * 
+   * @param readersCount
+   */
+  public void setReadersCount(int readersCount)
+  {
+    this.readersCount = readersCount;
+  }
+
+  /**
+   * Gets is sequential file read
+   *
+   * @return sequentialFileRead
+   */
+  public boolean isSequentialFileRead()
+  {
+    return sequentialFileRead;
+  }
+
+  /**
+   * Sets is sequential file read
+   *
+   * @param sequentialFileRead
+   */
+
+  public void setSequentialFileRead(boolean sequentialFileRead)
+  {
+    this.sequentialFileRead = sequentialFileRead;
+  }
+
+  /**
+   * Sets number of blocks to be emitted per window.<br/>
+   * A lot of blocks emitted per window can overwhelm the downstream operators.
+   * Set this value considering blockSize and readersCount.
+   * 
+   * @param threshold
+   */
+  public void setBlocksThreshold(int threshold)
+  {
+    this.blocksThreshold = threshold;
+  }
+
+  /**
+   * Gets number of blocks to be emitted per window.<br/>
+   * A lot of blocks emitted per window can overwhelm the downstream operators.
+   * Set this value considering blockSize and readersCount.
+   * 
+   * @return
+   */
+  public int getBlocksThreshold()
+  {
+    return blocksThreshold;
+  }
+
+  /**
+   * Criteria for record split
+   * 
+   * @return mode
+   */
+  public RECORD_READER_MODE getMode()
+  {
+    return mode;
+  }
+
+  /**
+   * Criteria for record split
+   * 
+   * @param mode
+   *          Mode
+   */
+  public void setMode(RECORD_READER_MODE mode)
+  {
+    this.mode = mode;
+  }
+
+  /**
+   * Length for fixed width record
+   * 
+   * @return record length
+   */
+  public int getRecordLength()
+  {
+    return recordLength;
+  }
+
+  /**
+   * Length for fixed width record
+   * 
+   * @param recordLength
+   */
+  public void setRecordLength(int recordLength)
+  {
+    this.recordLength = recordLength;
+  }
+
+  public static class SequentialFileBlockMetadataCodec
+      extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata>
+  {
+    @Override
+    public int getPartition(BlockMetadata.FileBlockMetadata fileBlockMetadata)
+    {
+      return fileBlockMetadata.hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02d657c2/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java 
b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
new file mode 100644
index 0000000..fdd888c
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+
+public class FSRecordReaderTest
+{
+  private String inputDir;
+  static String outputDir;
+  private static final String FILE_1 = "file1.txt";
+  private static final String FILE_2 = "file2.txt";
+  private static final String FILE_1_DATA = 
"1234\n567890\nabcde\nfgh\ni\njklmop";
+  private static final String FILE_2_DATA = "qr\nstuvw\nxyz\n";
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String baseDirectory;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      this.baseDirectory = "target/" + description.getClassName() + "/" + 
description.getMethodName();
+    }
+
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Before
+  public void setup() throws Exception
+  {
+    inputDir = testMeta.baseDirectory + File.separator + "input";
+
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), 
FILE_1_DATA);
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), 
FILE_2_DATA);
+  }
+
+  @Test
+  public void testDelimitedRecords() throws Exception
+  {
+
+    DelimitedApplication app = new DelimitedApplication();
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.HDFSRecordReaderModule.prop.files", inputDir);
+    conf.set("dt.operator.HDFSRecordReaderModule.prop.blocksThreshold", "1");
+    conf.set("dt.operator.HDFSRecordReaderModule.prop.scanIntervalMillis", 
"10000");
+
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+    LOG.debug("Waiting for app to finish");
+    Thread.sleep(1000 * 1);
+    lc.shutdown();
+  }
+
+  public static class DelimitedValidator extends BaseOperator
+  {
+    Set<String> records = new HashSet<String>();
+
+    public final transient DefaultInputPort<byte[]> data = new 
DefaultInputPort<byte[]>()
+    {
+
+      @Override
+      public void process(byte[] tuple)
+      {
+        String record = new String(tuple);
+        records.add(record);
+      }
+    };
+
+    public void teardown()
+    {
+      Set<String> expectedRecords = new 
HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n")));
+      expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n")));
+
+      Assert.assertEquals(expectedRecords, records);
+    }
+  }
+
+  private static class DelimitedApplication implements StreamingApplication
+  {
+
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      FSRecordReaderModule recordReader = 
dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class);
+      recordReader.setMode(RECORD_READER_MODE.DELIMITED_RECORD);
+      DelimitedValidator validator = dag.addOperator("Validator", new 
DelimitedValidator());
+      dag.addStream("records", recordReader.records, validator.data);
+    }
+
+  }
+
+  @Test
+  public void testFixedWidthRecords() throws Exception
+  {
+
+    FixedWidthApplication app = new FixedWidthApplication();
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.HDFSRecordReaderModule.prop.files", inputDir);
+    conf.set("dt.operator.HDFSRecordReaderModule.prop.recordLength", "8");
+    conf.set("dt.operator.HDFSRecordReaderModule.prop.blocksThreshold", "1");
+    conf.set("dt.operator.HDFSRecordReaderModule.prop.scanIntervalMillis", 
"10000");
+
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+    LOG.debug("Waiting for app to finish");
+    Thread.sleep(1000 * 1);
+    lc.shutdown();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMissingRecordLength() throws Exception
+  {
+    FixedWidthApplication app = new FixedWidthApplication();
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.HDFSRecordReaderModule.prop.files", inputDir);
+    //Should give IllegalArgumentException since recordLength is not set
+    //conf.set("dt.operator.HDFSRecordReaderModule.prop.recordLength", "8");
+    conf.set("dt.operator.HDFSRecordReaderModule.prop.blocksThreshold", "1");
+    conf.set("dt.operator.HDFSRecordReaderModule.prop.scanIntervalMillis", 
"10000");
+
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+    LOG.debug("Waiting for app to finish");
+    Thread.sleep(1000 * 1);
+    lc.shutdown();
+  }
+
+  public static class FixedWidthValidator extends BaseOperator
+  {
+    Set<String> records = new HashSet<String>();
+
+    public final transient DefaultInputPort<byte[]> data = new 
DefaultInputPort<byte[]>()
+    {
+
+      @Override
+      public void process(byte[] tuple)
+      {
+        String record = new String(tuple);
+        records.add(record);
+      }
+    };
+
+    public void teardown()
+    {
+      String[] expected = {"1234\n567", "890\nabcd", "e\nfgh\ni\n", "jklmop", 
"qr\nstuvw", "\nxyz\n" };
+
+      Set<String> expectedRecords = new 
HashSet<String>(Arrays.asList(expected));
+
+      Assert.assertEquals(expectedRecords, records);
+    }
+  }
+
+  private static class FixedWidthApplication implements StreamingApplication
+  {
+
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      FSRecordReaderModule recordReader = 
dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class);
+      recordReader.setMode(RECORD_READER_MODE.FIXED_WIDTH_RECORD);
+      FixedWidthValidator validator = dag.addOperator("Validator", new 
FixedWidthValidator());
+      dag.addStream("records", recordReader.records, validator.data);
+    }
+
+  }
+
+  private static Logger LOG = 
LoggerFactory.getLogger(FSRecordReaderTest.class);
+
+}

Reply via email to