[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15190268#comment-15190268
 ] 

ASF GitHub Bot commented on APEXMALHAR-2008:
--------------------------------------------

Github user ishark commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55776429
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java ---
    @@ -0,0 +1,253 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +import javax.validation.constraints.Size;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.Module;
    +import com.datatorrent.common.metric.MetricsAggregator;
    +import com.datatorrent.common.metric.SingleMetricAggregator;
    +import com.datatorrent.common.metric.sum.LongSumAggregator;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.BlockReader;
    +import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
    +import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * HDFSInputModule is used to read files/list of files (or directory) from 
HDFS. <br/>
    + * Module emits, <br/>
    + * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
    + * The module reads data in parallel, following parameters can be 
configured<br/>
    + * 1. files: list of file(s)/directories to read<br/>
    + * 2. filePatternRegularExp: Files names matching given regex will be 
read<br/>
    + * 3. scanIntervalMillis: interval between two scans to discover new files 
in input directory<br/>
    + * 4. recursive: if scan recursively input directories<br/>
    + * 5. blockSize: block size used to read input blocks of file<br/>
    + * 6. readersCount: count of readers to read input file<br/>
    + * 7. sequencialFileRead: If emit file blocks in sequence?
    + */
    +public class HDFSInputModule implements Module
    +{
    +
    +  @NotNull
    +  @Size(min = 1)
    +  private String files;
    +  private String filePatternRegularExp;
    +  @Min(0)
    +  private long scanIntervalMillis;
    +  private boolean recursive = true;
    +  private long blockSize;
    +  private boolean sequencialFileRead = false;
    +  private int readersCount;
    +
    +  public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput 
= new ProxyOutputPort();
    +  public final transient ProxyOutputPort<FileBlockMetadata> 
blocksMetadataOutput = new ProxyOutputPort();
    +  public final transient ProxyOutputPort<ReaderRecord<Slice>> messages = 
new ProxyOutputPort();
    +
    +  @Override
    +  public void populateDAG(DAG dag, Configuration conf)
    +  {
    +    HDFSFileSplitter fileSplitter = dag.addOperator("FileSplitter", new 
HDFSFileSplitter());
    +    BlockReader blockReader = dag.addOperator("BlockReader", new 
BlockReader());
    +
    +    dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
blockReader.blocksMetadataInput);
    +
    +    filesMetadataOutput.set(fileSplitter.filesMetadataOutput);
    +    blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
    +    messages.set(blockReader.messages);
    +
    +    fileSplitter.setSequencialFileRead(sequencialFileRead);
    +    if (blockSize != 0) {
    +      fileSplitter.setBlockSize(blockSize);
    +    }
    +
    +    HDFSScanner fileScanner = (HDFSScanner)fileSplitter.getScanner();
    +    fileScanner.setFiles(files);
    +    if (scanIntervalMillis != 0) {
    +      fileScanner.setScanIntervalMillis(scanIntervalMillis);
    +    }
    +    fileScanner.setRecursive(recursive);
    +    if (filePatternRegularExp != null) {
    +      
fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
    +    }
    +
    +    blockReader.setUri(files);
    +    if (readersCount != 0) {
    +      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, 
new StatelessPartitioner<BlockReader>(readersCount));
    +    }
    +
    +    MetricsAggregator blockReaderMetrics = new MetricsAggregator();
    +    blockReaderMetrics.addAggregators("bytesReadPerSec", new 
SingleMetricAggregator[] {new LongSumAggregator()});
    +    dag.setAttribute(blockReader, 
Context.OperatorContext.METRICS_AGGREGATOR, blockReaderMetrics);
    +    dag.setAttribute(blockReader, 
Context.OperatorContext.COUNTERS_AGGREGATOR, new 
BasicCounters.LongAggregator<MutableLong>());
    --- End diff --
    
    Can you use METRICS_AGGREGATOR here? COUNTERS_AGGREGATOR is marked 
deprecated.


> Create hdfs file input module 
> ------------------------------
>
>                 Key: APEXMALHAR-2008
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2008
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>            Priority: Minor
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> To read HDFS files in parallel using Apex we normally use FileSplitter and 
> FileReader module. It would be a good idea to combine those operators as a 
> unit in module. Having a module will give us readily usable set of operators 
> to read HDFS files. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to