[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355935#comment-15355935
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:
--------------------------------------------

Github user amberarrow commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/326#discussion_r69042882
  
    --- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
    @@ -0,0 +1,332 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.apex.malhar.lib.fs;
    +
    +import javax.validation.constraints.Min;
    +
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
    +import com.datatorrent.lib.io.block.BlockMetadata;
    +import com.datatorrent.lib.io.block.FSSliceReader;
    +import com.datatorrent.lib.io.fs.FileSplitterInput;
    +
    +/**
    + * This module is used for reading records/tuples from FileSystem. Records 
can
    + * be read in parallel using multiple partitions of record reader operator.
    + * (Ordering is not guaranteed when records are read in parallel)
    + *
    + * Input directory is scanned at specified interval to poll for new data.
    + * 
    + * The module reads data in parallel, following parameters can be 
configured
    + * <br/>
    + * 1. files: list of file(s)/directories to read<br/>
    + * 2. filePatternRegularExp: Files names matching given regex will be 
read<br/>
    + * 3. scanIntervalMillis: interval between two scans to discover new files 
in
    + * input directory<br/>
    + * 4. recursive: if scan recursively input directories<br/>
    + * 5. blockSize: block size used to read input blocks of file<br/>
    + * 6. readersCount: count of readers to read input file<br/>
    + * 7. sequentialFileRead: If emit file blocks in sequence?<br/>
    + * 8. blocksThreshold: number of blocks emitted per window
    + */
    [email protected]
    +public class FSRecordReaderModule implements Module
    +{
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    +  private String filePatternRegularExp;
    +  @Min(0)
    +  private long scanIntervalMillis;
    +  private boolean recursive = true;
    +  private boolean sequentialFileRead = false;
    +  private int readersCount;
    +  @Min(1)
    +  protected int blocksThreshold;
    +
    +  public final transient ProxyOutputPort<byte[]> records = new 
ProxyOutputPort<byte[]>();
    +
    +  /**
    +   * Criteria for record split
    +   */
    +  private RECORD_READER_MODE mode;
    +
    +  /**
    +   * Length for fixed width record
    +   */
    +  private int recordLength;
    +
    +  public FileSplitterInput createFileSplitter()
    +  {
    +    return new FileSplitterInput();
    +  }
    +
    +  public FSRecordReader createBlockReader()
    +  {
    +    FSRecordReader recordReader = new FSRecordReader();
    +    recordReader.setMode(mode);
    +    recordReader.setRecordLength(recordLength);
    +
    +    return recordReader;
    +  }
    +
    +  @Override
    +  public void populateDAG(DAG dag, Configuration configuration)
    +  {
    +    FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", 
createFileSplitter());
    +    FSRecordReader recordReader = dag.addOperator("BlockReader", 
createBlockReader());
    +
    +    dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
recordReader.blocksMetadataInput);
    +
    +    if (sequentialFileRead) {
    +      dag.setInputPortAttribute(recordReader.blocksMetadataInput, 
Context.PortContext.STREAM_CODEC,
    +          new SequentialFileBlockMetadataCodec());
    +    }
    +
    +    FileSplitterInput.TimeBasedDirectoryScanner fileScanner = 
fileSplitter.getScanner();
    +    fileScanner.setFiles(files);
    +    if (scanIntervalMillis != 0) {
    +      fileScanner.setScanIntervalMillis(scanIntervalMillis);
    +    }
    +    fileScanner.setRecursive(recursive);
    +    if (filePatternRegularExp != null) {
    +      
fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
    +    }
    +
    +    recordReader.setBasePath(files);
    +    if (readersCount != 0) {
    +      dag.setAttribute(recordReader, Context.OperatorContext.PARTITIONER,
    +          new StatelessPartitioner<FSSliceReader>(readersCount));
    +    }
    +    fileSplitter.setBlocksThreshold(blocksThreshold);
    +    records.set(recordReader.records);
    +  }
    +
    +  /**
    +   * A comma separated list of directories to scan. If the path is not 
fully
    +   * qualified the default file system is used. A fully qualified path can 
be
    +   * provided to scan directories in other filesystems.
    +   *
    +   * @param files
    +   *          files
    +   */
    +  public void setFiles(String files)
    +  {
    +    this.files = files;
    --- End diff --
    
    Shouldn't the field be named "directories" if we expect it to be a list of 
directories ?



> File Record reader module
> -------------------------
>
>                 Key: APEXMALHAR-2116
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Yogi Devendra
>            Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to