Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1972#discussion_r168714751
  
    --- Diff: 
processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
 ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.steps;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +import 
org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
    +import 
org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.loading.DataField;
    +import 
org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
    +import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepForPartitionImpl extends 
AbstractDataLoadProcessorStep {
    +
    +  private CarbonIterator<Object[]>[] inputIterators;
    +
    +  private boolean[] noDictionaryMapping;
    +
    +  private DataType[] dataTypes;
    +
    +  private int[] orderOfData;
    +
    +  public InputProcessorStepForPartitionImpl(CarbonDataLoadConfiguration 
configuration,
    +      CarbonIterator<Object[]>[] inputIterators) {
    +    super(configuration, null);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    return configuration.getDataFields();
    +  }
    +
    +  @Override public void initialize() throws IOException {
    +    super.initialize();
    +    // if logger is enabled then raw data will be required.
    +    RowConverterImpl rowConverter =
    +        new RowConverterImpl(configuration.getDataFields(), configuration, 
null);
    +    rowConverter.initialize();
    +    configuration.setCardinalityFinder(rowConverter);
    +    noDictionaryMapping =
    +        
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
    +    dataTypes = new DataType[configuration.getDataFields().length];
    +    for (int i = 0; i < dataTypes.length; i++) {
    +      if 
(configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) 
{
    +        dataTypes[i] = DataTypes.INT;
    +      } else {
    +        dataTypes[i] = 
configuration.getDataFields()[i].getColumn().getDataType();
    +      }
    +    }
    +    orderOfData = arrangeData(configuration.getDataFields(), 
configuration.getHeader());
    +  }
    +
    +  private int[] arrangeData(DataField[] dataFields, String[] header) {
    +    int[] data = new int[dataFields.length];
    +    for (int i = 0; i < dataFields.length; i++) {
    +      for (int j = 0; j < header.length; j++) {
    +        if 
(dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) {
    +          data[i] = j;
    +          break;
    +        }
    +      }
    +    }
    +    return data;
    +  }
    +
    +  @Override public Iterator<CarbonRowBatch>[] execute() {
    +    int batchSize = CarbonProperties.getInstance().getBatchSize();
    +    List<CarbonIterator<Object[]>>[] readerIterators = 
partitionInputReaderIterators();
    +    Iterator<CarbonRowBatch>[] outIterators = new 
Iterator[readerIterators.length];
    +    for (int i = 0; i < outIterators.length; i++) {
    +      outIterators[i] =
    +          new InputProcessorIterator(readerIterators[i], batchSize, 
configuration.isPreFetch(),
    +              rowCounter, orderOfData, noDictionaryMapping, dataTypes);
    +    }
    +    return outIterators;
    +  }
    +
    +  /**
    +   * Partition input iterators equally as per the number of threads.
    +   *
    +   * @return
    +   */
    +  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() 
{
    +    // Get the number of cores configured in property.
    +    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
    +    // Get the minimum of number of cores and iterators size to get the 
number of parallel threads
    +    // to be launched.
    +    int parallelThreadNumber = Math.min(inputIterators.length, 
numberOfCores);
    +
    +    List<CarbonIterator<Object[]>>[] iterators = new 
List[parallelThreadNumber];
    +    for (int i = 0; i < parallelThreadNumber; i++) {
    +      iterators[i] = new ArrayList<>();
    +    }
    +    // Equally partition the iterators as per number of threads
    +    for (int i = 0; i < inputIterators.length; i++) {
    +      iterators[i % parallelThreadNumber].add(inputIterators[i]);
    +    }
    +    return iterators;
    +  }
    +
    +  @Override protected CarbonRow processRow(CarbonRow row) {
    +    return null;
    +  }
    +
    +  @Override public void close() {
    +    if (!closed) {
    +      super.close();
    +      for (CarbonIterator inputIterator : inputIterators) {
    +        inputIterator.close();
    --- End diff --
    
    Better to put in try and finally block...even if one iterator fails to 
close still other iterators can be closed


---

Reply via email to