[ 
https://issues.apache.org/jira/browse/CARBONDATA-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591236#comment-15591236
 ] 

ASF GitHub Bot commented on CARBONDATA-298:
-------------------------------------------

Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84232604
  
    --- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> 
inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    +    int numberOfCores;
    +    try {
    +      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
    +          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
    +              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
    +    } catch (NumberFormatException exc) {
    +      numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
    +    }
    +    return numberOfCores;
    +  }
    +
    +  private int getBatchSize() {
    +    int batchSize;
    +    try {
    +      batchSize = Integer.parseInt(configuration
    +          
.getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
    +              
DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
    +    } catch (NumberFormatException exc) {
    +      batchSize = 
Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
    +    }
    +    return batchSize;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = getBatchSize();
    +    List<Iterator<Object[]>>[] readerIterators = 
partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new 
Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
    +    }
    +    return outIterators;
    +  }
    +
    +  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
    +    int numberOfCores = getNumberOfCores();
    +    if (inputIterators.size() < numberOfCores) {
    +      numberOfCores = inputIterators.size();
    --- End diff --
    
    please use a different name (suggest parallelism), the meaning of this 
variable is `min(inputIterator.size(), numberOfCores)`


> 3. Add InputProcessorStep which should iterate recordreader and parse the 
> data as per the data type.
> ----------------------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-298
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-298
>             Project: CarbonData
>          Issue Type: Sub-task
>            Reporter: Ravindra Pesala
>             Fix For: 0.2.0-incubating
>
>
> Add InputProcessorStep which should iterate recordreader/RecordBufferedWriter 
> and parse the data as per the data types.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to