Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168714751
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import
org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import
org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import
org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepForPartitionImpl extends
AbstractDataLoadProcessorStep {
+
+ private CarbonIterator<Object[]>[] inputIterators;
+
+ private boolean[] noDictionaryMapping;
+
+ private DataType[] dataTypes;
+
+ private int[] orderOfData;
+
+ public InputProcessorStepForPartitionImpl(CarbonDataLoadConfiguration
configuration,
+ CarbonIterator<Object[]>[] inputIterators) {
+ super(configuration, null);
+ this.inputIterators = inputIterators;
+ }
+
+ @Override public DataField[] getOutput() {
+ return configuration.getDataFields();
+ }
+
+ @Override public void initialize() throws IOException {
+ super.initialize();
+ // if logger is enabled then raw data will be required.
+ RowConverterImpl rowConverter =
+ new RowConverterImpl(configuration.getDataFields(), configuration,
null);
+ rowConverter.initialize();
+ configuration.setCardinalityFinder(rowConverter);
+ noDictionaryMapping =
+
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+ dataTypes = new DataType[configuration.getDataFields().length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ if
(configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY))
{
+ dataTypes[i] = DataTypes.INT;
+ } else {
+ dataTypes[i] =
configuration.getDataFields()[i].getColumn().getDataType();
+ }
+ }
+ orderOfData = arrangeData(configuration.getDataFields(),
configuration.getHeader());
+ }
+
+ private int[] arrangeData(DataField[] dataFields, String[] header) {
+ int[] data = new int[dataFields.length];
+ for (int i = 0; i < dataFields.length; i++) {
+ for (int j = 0; j < header.length; j++) {
+ if
(dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) {
+ data[i] = j;
+ break;
+ }
+ }
+ }
+ return data;
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] execute() {
+ int batchSize = CarbonProperties.getInstance().getBatchSize();
+ List<CarbonIterator<Object[]>>[] readerIterators =
partitionInputReaderIterators();
+ Iterator<CarbonRowBatch>[] outIterators = new
Iterator[readerIterators.length];
+ for (int i = 0; i < outIterators.length; i++) {
+ outIterators[i] =
+ new InputProcessorIterator(readerIterators[i], batchSize,
configuration.isPreFetch(),
+ rowCounter, orderOfData, noDictionaryMapping, dataTypes);
+ }
+ return outIterators;
+ }
+
+ /**
+ * Partition input iterators equally as per the number of threads.
+ *
+ * @return
+ */
+ private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators()
{
+ // Get the number of cores configured in property.
+ int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+ // Get the minimum of number of cores and iterators size to get the
number of parallel threads
+ // to be launched.
+ int parallelThreadNumber = Math.min(inputIterators.length,
numberOfCores);
+
+ List<CarbonIterator<Object[]>>[] iterators = new
List[parallelThreadNumber];
+ for (int i = 0; i < parallelThreadNumber; i++) {
+ iterators[i] = new ArrayList<>();
+ }
+ // Equally partition the iterators as per number of threads
+ for (int i = 0; i < inputIterators.length; i++) {
+ iterators[i % parallelThreadNumber].add(inputIterators[i]);
+ }
+ return iterators;
+ }
+
+ @Override protected CarbonRow processRow(CarbonRow row) {
+ return null;
+ }
+
+ @Override public void close() {
+ if (!closed) {
+ super.close();
+ for (CarbonIterator inputIterator : inputIterators) {
+ inputIterator.close();
--- End diff --
Better to put in try and finally block...even if one iterator fails to
close still other iterators can be closed
---