[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352095#comment-15352095
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:
--------------------------------------------

Github user amberarrow commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/326#discussion_r68678627
  
    --- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.apex.malhar.lib.fs;
    +
    +import java.io.IOException;
    +
    +import org.apache.commons.beanutils.ConvertUtils;
    +import org.apache.commons.beanutils.converters.AbstractConverter;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.lib.io.block.BlockMetadata;
    +import com.datatorrent.lib.io.block.FSSliceReader;
    +import com.datatorrent.lib.io.block.ReaderContext;
    +
    +/**
    + * This operator can be used for reading records/tuples from Filesystem in
    + * parallel (without ordering guarantees between tuples). Records can be
    + * delimited (e.g. newline) or fixed width records. Output tuples are 
byte[].
    + * 
    + * Typically, this operator will be connected to output of 
FileSplitterInput to
    + * read records in parallel.
    + */
    [email protected]
    +public class FSRecordReader extends FSSliceReader
    +{
    +  /**
    +   * Record reader mode decides how to split the records.
    +   */
    +  public static enum RECORD_READER_MODE
    +  {
    +    DELIMITED_RECORD, FIXED_WIDTH_RECORD;
    +  }
    +
    +  /**
    +   * Criteria for record split
    +   */
    +  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
    +
    +  /**
    +   * Length for fixed width record
    +   */
    +  private int recordLength;
    +
    +  /**
    +   * Port to emit individual records/tuples as byte[]
    +   */
    +  public final transient DefaultOutputPort<byte[]> records = new 
DefaultOutputPort<byte[]>();
    +
    +  /**
    +   * Initialize appropriate reader context based on mode selection
    +   */
    +  @Override
    +  public void setup(OperatorContext context)
    +  {
    +    super.setup(context);
    +    if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) {
    +      ReaderContext.FixedBytesReaderContext<FSDataInputStream> 
fixedBytesReaderContext = new 
ReaderContext.FixedBytesReaderContext<FSDataInputStream>();
    +      fixedBytesReaderContext.setLength(recordLength);
    +      readerContext = fixedBytesReaderContext;
    +    } else {
    +      readerContext = new 
ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>();
    +    }
    +  }
    +
    +  /**
    +   * Read the block data and emit records based on reader context
    +   *
    +   * @param blockMetadata
    +   *          block
    +   * @throws IOException
    +   */
    +  protected void readBlock(BlockMetadata blockMetadata) throws IOException
    +  {
    +    readerContext.initialize(stream, blockMetadata, consecutiveBlock);
    +    ReaderContext.Entity entity;
    +    while ((entity = readerContext.next()) != null) {
    +
    +      
counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
    +
    +      byte[] record = entity.getRecord();
    +
    +      if (record != null) {
    +        counters.getCounter(ReaderCounterKeys.RECORDS).increment();
    +        records.emit(record);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Criteria for record split
    +   * 
    +   * @param mode
    +   *          Mode
    +   */
    +  public void setMode(RECORD_READER_MODE mode)
    +  {
    +    this.mode = mode;
    +  }
    +
    +  /**
    +   * Criteria for record split
    +   * 
    +   * @return mode
    +   */
    +  public RECORD_READER_MODE getMode()
    +  {
    +    return mode;
    +  }
    +
    +  /**
    +   * Length for fixed width record
    +   * 
    +   * @param recordLength
    +   */
    +  public void setRecordLength(int recordLength)
    +  {
    +    this.recordLength = recordLength;
    +  }
    +
    +  /**
    +   * Length for fixed width record
    +   * 
    +   * @return record length
    +   */
    +  public int getRecordLength()
    +  {
    +    return recordLength;
    +  }
    +
    +  static {
    +    /**
    +     * Code for enabling BeanUtils to accept comma separated string to
    +     * initialize FIELD_TYPE[]
    +     */
    +    class RecordReaderModeConverter extends AbstractConverter
    --- End diff --
    
    Could you provide more detail on why this is necessary and how it is used ?


> File Record reader module
> -------------------------
>
>                 Key: APEXMALHAR-2116
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Yogi Devendra
>            Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to