Added Sort processor step for dataloading.

Fixed review comments

Rebased and fixed comments.

Fixed checkstyle issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9aee9808
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9aee9808
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9aee9808

Branch: refs/heads/master
Commit: 9aee9808c50d108a3d9ec9c8ab695282ce0d1e1d
Parents: eb85287
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Mon Oct 17 23:36:37 2016 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Tue Oct 25 14:35:39 2016 +0800

----------------------------------------------------------------------
 .../processing/newflow/row/CarbonRow.java       |   4 +
 .../processing/newflow/sort/Sorter.java         |  56 ++
 .../sort/impl/ParallelReadMergeSorterImpl.java  | 227 ++++++++
 .../sort/impl/SortPreparatorIterator.java       | 147 ++++++
 .../newflow/steps/InputProcessorStepImpl.java   | 167 ++++++
 .../newflow/steps/SortProcessorStepImpl.java    |  76 +++
 .../steps/input/InputProcessorStepImpl.java     | 167 ------
 .../sortdata/FileMergerParameters.java          | 216 --------
 .../sortdata/IntermediateFileMerger.java        |  41 +-
 .../sortandgroupby/sortdata/SortDataRows.java   | 351 ++-----------
 .../sortdata/SortIntermediateFileMerger.java    | 111 ++++
 .../sortandgroupby/sortdata/SortParameters.java | 516 +++++++++++++++++++
 .../sortdatastep/SortKeyStep.java               |  23 +-
 .../store/SingleThreadFinalSortFilesMerger.java |   5 +-
 .../exception/CarbonDataWriterException.java    |   2 +-
 .../util/CarbonDataProcessorUtil.java           |  24 +
 .../processing/util/RemoveDictionaryUtil.java   |  55 ++
 17 files changed, 1479 insertions(+), 709 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
index e1aa601..34f8439 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
@@ -60,6 +60,10 @@ public class CarbonRow {
     return (String) data[ordinal];
   }
 
+  public Object getObject(int ordinal) {
+    return data[ordinal];
+  }
+
   public byte[] getBinary(int ordinal) {
     return (byte[]) data[ordinal];
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java
new file mode 100644
index 0000000..e2e238c
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort;
+
+import java.util.Iterator;
+
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+
+/**
+ * This interface sorts all the data of iterators.
+ * The life cycle of this interface is initialize -> sort -> close
+ */
+public interface Sorter {
+
+  /**
+   * Initialize sorter with sort parameters.
+   *
+   * @param sortParameters
+   */
+  void initialize(SortParameters sortParameters);
+
+  /**
+   * Sorts the data of all iterators, this iterators can be
+   * read parallely depends on implementation.
+   *
+   * @param iterators array of iterators to read data.
+   * @return
+   * @throws CarbonDataLoadingException
+   */
+  Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException;
+
+  /**
+   * Close resources
+   */
+  void close();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
new file mode 100644
index 0000000..0e575bc
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.newflow.sort.Sorter;
+import 
org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
+import 
org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
+import 
org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be 
merge sorted to get
+ * final merge sort result.
+ */
+public class ParallelReadMergeSorterImpl implements Sorter {
+
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
+
+  private SortParameters sortParameters;
+
+  private SortIntermediateFileMerger intermediateFileMerger;
+
+  private ExecutorService executorService;
+
+  private SingleThreadFinalSortFilesMerger finalMerger;
+
+  private DataField[] inputDataFields;
+
+  public ParallelReadMergeSorterImpl(DataField[] inputDataFields) {
+    this.inputDataFields = inputDataFields;
+  }
+
+  @Override
+  public void initialize(SortParameters sortParameters) {
+    this.sortParameters = sortParameters;
+    intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
+    String storeLocation =
+        CarbonDataProcessorUtil.getLocalDataFolderLocation(
+            sortParameters.getDatabaseName(), sortParameters.getTableName(),
+            String.valueOf(sortParameters.getTaskNo()), 
sortParameters.getPartitionID(),
+            sortParameters.getSegmentId() + "", false);
+    // Set the data file location
+    String dataFolderLocation =
+        storeLocation + File.separator + 
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
+    finalMerger =
+        new SingleThreadFinalSortFilesMerger(dataFolderLocation, 
sortParameters.getTableName(),
+            sortParameters.getDimColCount() - 
sortParameters.getComplexDimColCount(),
+            sortParameters.getComplexDimColCount(), 
sortParameters.getMeasureColCount(),
+            sortParameters.getNoDictionaryCount(), sortParameters.getAggType(),
+            sortParameters.getNoDictionaryDimnesionColumn());
+  }
+
+  @Override
+  public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    SortDataRows[] sortDataRows = new SortDataRows[iterators.length];
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        sortDataRows[i] = new SortDataRows(sortParameters, 
intermediateFileMerger);
+        // initialize sort
+        sortDataRows[i].initialize();
+      }
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    this.executorService = Executors.newFixedThreadPool(iterators.length);
+
+    // First prepare the data for sort.
+    Iterator<CarbonRowBatch>[] sortPrepIterators = new 
Iterator[iterators.length];
+    for (int i = 0; i < sortPrepIterators.length; i++) {
+      sortPrepIterators[i] = new SortPreparatorIterator(iterators[i], 
inputDataFields);
+    }
+
+    for (int i = 0; i < sortDataRows.length; i++) {
+      executorService
+          .submit(new SortIteratorThread(sortPrepIterators[i], 
sortDataRows[i], sortParameters));
+    }
+
+    try {
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+    } catch (InterruptedException e) {
+      throw new CarbonDataLoadingException("Problem while shutdown the server 
", e);
+    }
+    try {
+      intermediateFileMerger.finish();
+      finalMerger.startFinalMerge();
+    } catch (CarbonDataWriterException e) {
+      throw new CarbonDataLoadingException(e);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    //TODO get the batch size from CarbonProperties
+    final int batchSize = 1000;
+
+    // Creates the iterator to read from merge sorter.
+    Iterator<CarbonRowBatch> batchIterator = new 
CarbonIterator<CarbonRowBatch>() {
+
+      @Override
+      public boolean hasNext() {
+        return finalMerger.hasNext();
+      }
+
+      @Override
+      public CarbonRowBatch next() {
+        int counter = 0;
+        CarbonRowBatch rowBatch = new CarbonRowBatch();
+        while (finalMerger.hasNext() && counter < batchSize) {
+          rowBatch.addRow(new CarbonRow(finalMerger.next()));
+          counter++;
+        }
+        return rowBatch;
+      }
+    };
+    return new Iterator[] { batchIterator };
+  }
+
+  @Override public void close() {
+    intermediateFileMerger.close();
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows to @{@link 
SortDataRows}
+   */
+  private static class SortIteratorThread implements Callable<Void> {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private SortDataRows sortDataRows;
+
+    private SortParameters parameters;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows 
sortDataRows,
+        SortParameters parameters) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+      this.parameters = parameters;
+    }
+
+    @Override
+    public Void call() throws CarbonDataLoadingException {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
+          while (batchIterator.hasNext()) {
+            sortDataRows.addRow(batchIterator.next().getData());
+          }
+        }
+
+        processRowToNextStep(sortDataRows);
+      } catch (CarbonSortKeyAndGroupByException e) {
+        LOGGER.error(e);
+        throw new CarbonDataLoadingException(e);
+      }
+      return null;
+    }
+
+    /**
+     * Below method will be used to process data to next step
+     */
+    private boolean processRowToNextStep(SortDataRows sortDataRows)
+        throws CarbonDataLoadingException {
+      if (null == sortDataRows) {
+        LOGGER.info("Record Processed For table: " + 
parameters.getTableName());
+        LOGGER.info("Number of Records was Zero");
+        String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": 
Write: " + 0;
+        LOGGER.info(logMessage);
+        return false;
+      }
+
+      try {
+        // start sorting
+        sortDataRows.startSorting();
+
+        // check any more rows are present
+        LOGGER.info("Record Processed For table: " + 
parameters.getTableName());
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+            .recordSortRowsStepTotalTime(parameters.getPartitionID(), 
System.currentTimeMillis());
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+            .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
+                System.currentTimeMillis());
+        return false;
+      } catch (CarbonSortKeyAndGroupByException e) {
+        throw new CarbonDataLoadingException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java
new file mode 100644
index 0000000..9c4305a
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.constants.IgnoreDictionary;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+/**
+ * This iterator transform the row to how carbon sorter interface expects it.
+ * TODO : It supposed to return a comparable ROW which can sort the row.
+ */
+public class SortPreparatorIterator extends CarbonIterator<CarbonRowBatch> {
+
+  private Iterator<CarbonRowBatch> iterator;
+
+  private int[] dictionaryFieldIndexes;
+
+  private int[] nonDictionaryFieldIndexes;
+
+  private int[] measueFieldIndexes;
+
+  private int dimIndexInRow = 
IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
+
+  private int byteArrayIndexInRow = 
IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
+
+  private int measureIndexInRow = 
IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
+
+  public SortPreparatorIterator(Iterator<CarbonRowBatch> iterator, DataField[] 
dataFields) {
+    this.iterator = iterator;
+    List<Integer> dictIndexes = new ArrayList<>();
+    List<Integer> nonDictIndexes = new ArrayList<>();
+    List<Integer> msrIndexes = new ArrayList<>();
+    for (int i = 0; i < dataFields.length; i++) {
+      if (dataFields[i].getColumn().isDimesion()) {
+        if (dataFields[i].hasDictionaryEncoding()) {
+          dictIndexes.add(i);
+        } else {
+          nonDictIndexes.add(i);
+        }
+      } else {
+        msrIndexes.add(i);
+      }
+    }
+    dictionaryFieldIndexes =
+        ArrayUtils.toPrimitive(dictIndexes.toArray(new 
Integer[dictIndexes.size()]));
+    nonDictionaryFieldIndexes =
+        ArrayUtils.toPrimitive(nonDictIndexes.toArray(new 
Integer[nonDictIndexes.size()]));
+    measueFieldIndexes = ArrayUtils.toPrimitive(msrIndexes.toArray(new 
Integer[msrIndexes.size()]));
+  }
+
+  @Override
+  public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override
+  public CarbonRowBatch next() {
+    CarbonRowBatch batch = iterator.next();
+    Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
+    while (batchIterator.hasNext()) {
+      Object[] outputArray = new Object[3];
+      CarbonRow row = batchIterator.next();
+      fillDictionaryArrayFromRow(row, outputArray);
+      fillNonDictionaryArrayFromRow(row, outputArray);
+      fillMeasureArrayFromRow(row, outputArray);
+    }
+    return batch;
+  }
+
+  /**
+   * Collect all dictionary values to single integer array and store it in 0 
index of out put array.
+   *
+   * @param row
+   * @param outputArray
+   */
+  private void fillDictionaryArrayFromRow(CarbonRow row, Object[] outputArray) 
{
+    if (dictionaryFieldIndexes.length > 0) {
+      int[] dictArray = new int[dictionaryFieldIndexes.length];
+      for (int i = 0; i < dictionaryFieldIndexes.length; i++) {
+        dictArray[i] = row.getInt(dictionaryFieldIndexes[i]);
+      }
+      outputArray[dimIndexInRow] = dictArray;
+    }
+  }
+
+  /**
+   * collect all non dictionary columns and compose it to single byte array 
and store it in 1 index
+   * of out put array
+   *
+   * @param row
+   * @param outputArray
+   */
+  private void fillNonDictionaryArrayFromRow(CarbonRow row, Object[] 
outputArray) {
+    if (nonDictionaryFieldIndexes.length > 0) {
+      byte[][] nonDictByteArray = new byte[nonDictionaryFieldIndexes.length][];
+      for (int i = 0; i < nonDictByteArray.length; i++) {
+        nonDictByteArray[i] = row.getBinary(nonDictionaryFieldIndexes[i]);
+      }
+
+      byte[] nonDictionaryCols =
+          
RemoveDictionaryUtil.packByteBufferIntoSingleByteArray(nonDictByteArray);
+      outputArray[byteArrayIndexInRow] = nonDictionaryCols;
+    }
+  }
+
+  /**
+   * Collect all measure values as array and store it in 2 index of out put 
array.
+   *
+   * @param row
+   * @param outputArray
+   */
+  private void fillMeasureArrayFromRow(CarbonRow row, Object[] outputArray) {
+    if (measueFieldIndexes.length > 0) {
+      Object[] measureArray = new Object[measueFieldIndexes.length];
+      for (int i = 0; i < measueFieldIndexes.length; i++) {
+        measureArray[i] = row.getObject(measueFieldIndexes[i]);
+      }
+      outputArray[measureIndexInRow] = measureArray;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
new file mode 100644
index 0000000..02b7fee
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
@@ -0,0 +1,167 @@
+package org.apache.carbondata.processing.newflow.steps;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.parser.RowParser;
+import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List<Iterator<Object[]>> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> 
inputIterators) {
+    super(configuration, child);
+    this.inputIterators = inputIterators;
+  }
+
+  @Override
+  public DataField[] getOutput() {
+    DataField[] fields = configuration.getDataFields();
+    String[] header = configuration.getHeader();
+    DataField[] output = new DataField[fields.length];
+    int k = 0;
+    for (int i = 0; i < header.length; i++) {
+      for (int j = 0; j < fields.length; j++) {
+        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+          output[k++] = fields[j];
+          break;
+        }
+      }
+    }
+    return output;
+  }
+
+  @Override
+  public void initialize() throws CarbonDataLoadingException {
+    DataField[] output = getOutput();
+    genericParsers = new GenericParser[output.length];
+    for (int i = 0; i < genericParsers.length; i++) {
+      genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+          (String[]) configuration
+              
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+    }
+  }
+
+
+
+  @Override
+  public Iterator<CarbonRowBatch>[] execute() {
+    int batchSize = CarbonProperties.getInstance().getBatchSize();
+    List<Iterator<Object[]>>[] readerIterators = 
partitionInputReaderIterators();
+    Iterator<CarbonRowBatch>[] outIterators = new 
Iterator[readerIterators.length];
+    for (int i = 0; i < outIterators.length; i++) {
+      outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
+    }
+    return outIterators;
+  }
+
+  /**
+   * Partition input iterators equally as per the number of threads.
+   * @return
+   */
+  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
+    // Get the number of cores configured in property.
+    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    // Get the minimum of number of cores and iterators size to get the number 
of parallel threads
+    // to be launched.
+    int parallelThreadNumber = Math.min(inputIterators.size(), numberOfCores);
+
+    List<Iterator<Object[]>>[] iterators = new List[parallelThreadNumber];
+    for (int i = 0; i < parallelThreadNumber; i++) {
+      iterators[i] = new ArrayList<>();
+    }
+    // Equally partition the iterators as per number of threads
+    for (int i = 0; i < inputIterators.size(); i++) {
+      iterators[i % parallelThreadNumber].add(inputIterators.get(i));
+    }
+    return iterators;
+  }
+
+  @Override
+  protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+  /**
+   * This iterator wraps the list of iterators and it starts iterating the each
+   * iterator of the list one by one. It also parse the data while iterating 
it.
+   */
+  private static class InputProcessorIterator extends 
CarbonIterator<CarbonRowBatch> {
+
+    private List<Iterator<Object[]>> inputIterators;
+
+    private Iterator<Object[]> currentIterator;
+
+    private int counter;
+
+    private int batchSize;
+
+    private RowParser rowParser;
+
+    public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
+        GenericParser[] genericParsers, int batchSize) {
+      this.inputIterators = inputIterators;
+      this.batchSize = batchSize;
+      this.rowParser = new RowParserImpl(genericParsers);
+      this.counter = 0;
+      // Get the first iterator from the list.
+      currentIterator = inputIterators.get(counter++);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return internalHasNext();
+    }
+
+    private boolean internalHasNext() {
+      boolean hasNext = currentIterator.hasNext();
+      // If iterator is finished then check for next iterator.
+      if (!hasNext) {
+        // Check next iterator is available in the list.
+        if (counter < inputIterators.size()) {
+          // Get the next iterator from the list.
+          currentIterator = inputIterators.get(counter++);
+        }
+        hasNext = internalHasNext();
+      }
+      return hasNext;
+    }
+
+    @Override
+    public CarbonRowBatch next() {
+      // Create batch and fill it.
+      CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
+      int count = 0;
+      while (internalHasNext() && count < batchSize) {
+        carbonRowBatch.addRow(new 
CarbonRow(rowParser.parseRow(currentIterator.next())));
+        count++;
+      }
+      return carbonRowBatch;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
new file mode 100644
index 0000000..aae6dae
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.steps;
+
+import java.util.Iterator;
+
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.newflow.sort.Sorter;
+import 
org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+
+/**
+ * It sorts the data and write them to intermediate temp files. These files 
will be further read
+ * by next step for writing to carbondata files.
+ */
+public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private Sorter sorter;
+
+  public SortProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override
+  public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override
+  public void initialize() throws CarbonDataLoadingException {
+    SortParameters sortParameters = 
SortParameters.createSortParameters(configuration);
+    sorter = new ParallelReadMergeSorterImpl(child.getOutput());
+    sorter.initialize(sortParameters);
+  }
+
+  @Override
+  public Iterator<CarbonRowBatch>[] execute() throws 
CarbonDataLoadingException {
+    final Iterator<CarbonRowBatch>[] iterators = child.execute();
+    Iterator<CarbonRowBatch>[] sortedIterators = sorter.sort(iterators);
+    child.close();
+    return sortedIterators;
+  }
+
+  @Override
+  protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+  @Override
+  public void close() {
+    sorter.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
deleted file mode 100644
index 75009c5..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package org.apache.carbondata.processing.newflow.steps.input;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
-import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
-import org.apache.carbondata.processing.newflow.parser.RowParser;
-import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-
-/**
- * It reads data from record reader and sends data to next step.
- */
-public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
-
-  private GenericParser[] genericParsers;
-
-  private List<Iterator<Object[]>> inputIterators;
-
-  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
-      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> 
inputIterators) {
-    super(configuration, child);
-    this.inputIterators = inputIterators;
-  }
-
-  @Override
-  public DataField[] getOutput() {
-    DataField[] fields = configuration.getDataFields();
-    String[] header = configuration.getHeader();
-    DataField[] output = new DataField[fields.length];
-    int k = 0;
-    for (int i = 0; i < header.length; i++) {
-      for (int j = 0; j < fields.length; j++) {
-        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
-          output[k++] = fields[j];
-          break;
-        }
-      }
-    }
-    return output;
-  }
-
-  @Override
-  public void initialize() throws CarbonDataLoadingException {
-    DataField[] output = getOutput();
-    genericParsers = new GenericParser[output.length];
-    for (int i = 0; i < genericParsers.length; i++) {
-      genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
-          (String[]) configuration
-              
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
-    }
-  }
-
-
-
-  @Override
-  public Iterator<CarbonRowBatch>[] execute() {
-    int batchSize = CarbonProperties.getInstance().getBatchSize();
-    List<Iterator<Object[]>>[] readerIterators = 
partitionInputReaderIterators();
-    Iterator<CarbonRowBatch>[] outIterators = new 
Iterator[readerIterators.length];
-    for (int i = 0; i < outIterators.length; i++) {
-      outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
-    }
-    return outIterators;
-  }
-
-  /**
-   * Partition input iterators equally as per the number of threads.
-   * @return
-   */
-  private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
-    // Get the number of cores configured in property.
-    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
-    // Get the minimum of number of cores and iterators size to get the number 
of parallel threads
-    // to be launched.
-    int parallelThreadNumber = Math.min(inputIterators.size(), numberOfCores);
-
-    List<Iterator<Object[]>>[] iterators = new List[parallelThreadNumber];
-    for (int i = 0; i < parallelThreadNumber; i++) {
-      iterators[i] = new ArrayList<>();
-    }
-    // Equally partition the iterators as per number of threads
-    for (int i = 0; i < inputIterators.size(); i++) {
-      iterators[i % parallelThreadNumber].add(inputIterators.get(i));
-    }
-    return iterators;
-  }
-
-  @Override
-  protected CarbonRow processRow(CarbonRow row) {
-    return null;
-  }
-
-  /**
-   * This iterator wraps the list of iterators and it starts iterating the each
-   * iterator of the list one by one. It also parse the data while iterating 
it.
-   */
-  private static class InputProcessorIterator extends 
CarbonIterator<CarbonRowBatch> {
-
-    private List<Iterator<Object[]>> inputIterators;
-
-    private Iterator<Object[]> currentIterator;
-
-    private int counter;
-
-    private int batchSize;
-
-    private RowParser rowParser;
-
-    public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
-        GenericParser[] genericParsers, int batchSize) {
-      this.inputIterators = inputIterators;
-      this.batchSize = batchSize;
-      this.rowParser = new RowParserImpl(genericParsers);
-      this.counter = 0;
-      // Get the first iterator from the list.
-      currentIterator = inputIterators.get(counter++);
-    }
-
-    @Override
-    public boolean hasNext() {
-      return internalHasNext();
-    }
-
-    private boolean internalHasNext() {
-      boolean hasNext = currentIterator.hasNext();
-      // If iterator is finished then check for next iterator.
-      if (!hasNext) {
-        // Check next iterator is available in the list.
-        if (counter < inputIterators.size()) {
-          // Get the next iterator from the list.
-          currentIterator = inputIterators.get(counter++);
-        }
-        hasNext = internalHasNext();
-      }
-      return hasNext;
-    }
-
-    @Override
-    public CarbonRowBatch next() {
-      // Create batch and fill it.
-      CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
-      int count = 0;
-      while (internalHasNext() && count < batchSize) {
-        carbonRowBatch.addRow(new 
CarbonRow(rowParser.parseRow(currentIterator.next())));
-        count++;
-      }
-      return carbonRowBatch;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/FileMergerParameters.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/FileMergerParameters.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/FileMergerParameters.java
deleted file mode 100644
index bd6374b..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/FileMergerParameters.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.File;
-
-public class FileMergerParameters {
-  /**
-   * intermediateFiles
-   */
-  private File[] intermediateFiles;
-
-  /**
-   * fileBufferSize
-   */
-  private int fileReadBufferSize;
-
-  /**
-   * fileWriteSize
-   */
-  private int fileWriteBufferSize;
-
-  /**
-   * measure count
-   */
-  private int measureColCount;
-
-  /**
-   * measure count
-   */
-  private int dimColCount;
-
-  /**
-   * complexDimColCount
-   */
-  private int complexDimColCount;
-
-  /**
-   * measure count
-   */
-  private int noDictionaryCount;
-
-  /**
-   * outFile
-   */
-  private File outFile;
-
-  /**
-   * sortTempFileNoOFRecordsInCompression
-   */
-  private int noOfRecordsInCompression;
-
-  /**
-   * isSortTempFileCompressionEnabled
-   */
-  private boolean isCompressionEnabled;
-
-  /**
-   * prefetch
-   */
-  private boolean prefetch;
-
-  private char[] aggType;
-
-  /**
-   * to check whether dimension is of dictionary
-   * type or not
-   */
-  private boolean[] isNoDictionaryDimensionColumn;
-
-  /**
-   * prefetchBufferSize
-   */
-  private int prefetchBufferSize;
-
-  public File[] getIntermediateFiles() {
-    return intermediateFiles;
-  }
-
-  public void setIntermediateFiles(final File[] intermediateFiles) {
-    this.intermediateFiles = intermediateFiles;
-  }
-
-  public int getFileReadBufferSize() {
-    return fileReadBufferSize;
-  }
-
-  public void setFileReadBufferSize(int fileReadBufferSize) {
-    this.fileReadBufferSize = fileReadBufferSize;
-  }
-
-  public int getFileWriteBufferSize() {
-    return fileWriteBufferSize;
-  }
-
-  public void setFileWriteBufferSize(int fileWriteBufferSize) {
-    this.fileWriteBufferSize = fileWriteBufferSize;
-  }
-
-  public int getMeasureColCount() {
-    return measureColCount;
-  }
-
-  public void setMeasureColCount(int measureColCount) {
-    this.measureColCount = measureColCount;
-  }
-
-  public int getDimColCount() {
-    return dimColCount;
-  }
-
-  public void setDimColCount(int dimColCount) {
-    this.dimColCount = dimColCount;
-  }
-
-  public int getComplexDimColCount() {
-    return complexDimColCount;
-  }
-
-  public void setComplexDimColCount(int complexDimColCount) {
-    this.complexDimColCount = complexDimColCount;
-  }
-
-  public File getOutFile() {
-    return outFile;
-  }
-
-  public void setOutFile(File outFile) {
-    this.outFile = outFile;
-  }
-
-  public int getNoOfRecordsInCompression() {
-    return noOfRecordsInCompression;
-  }
-
-  public void setNoOfRecordsInCompression(int noOfRecordsInCompression) {
-    this.noOfRecordsInCompression = noOfRecordsInCompression;
-  }
-
-  public boolean isCompressionEnabled() {
-    return isCompressionEnabled;
-  }
-
-  public void setCompressionEnabled(boolean isCompressionEnabled) {
-    this.isCompressionEnabled = isCompressionEnabled;
-  }
-
-  public boolean isPrefetch() {
-    return prefetch;
-  }
-
-  public void setPrefetch(boolean prefetch) {
-    this.prefetch = prefetch;
-  }
-
-  public int getPrefetchBufferSize() {
-    return prefetchBufferSize;
-  }
-
-  public void setPrefetchBufferSize(int prefetchBufferSize) {
-    this.prefetchBufferSize = prefetchBufferSize;
-  }
-
-  public char[] getAggType() {
-    return aggType;
-  }
-
-  public void setAggType(char[] aggType) {
-    this.aggType = aggType;
-  }
-
-  /**
-   * @return the noDictionaryCount
-   */
-  public int getNoDictionaryCount() {
-    return noDictionaryCount;
-  }
-
-  /**
-   * @param noDictionaryCount the noDictionaryCount to set
-   */
-  public void setNoDictionaryCount(int noDictionaryCount) {
-    this.noDictionaryCount = noDictionaryCount;
-  }
-
-  /**
-   * @return the isNoDictionaryDimensionColumn
-   */
-  public boolean[] getIsNoDictionaryDimensionColumn() {
-    return isNoDictionaryDimensionColumn;
-  }
-
-  /**
-   * @param isNoDictionaryDimensionColumn the isNoDictionaryDimensionColumn to 
set
-   */
-  public void setIsNoDictionaryDimensionColumn(boolean[] 
isNoDictionaryDimensionColumn) {
-    this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 4188a9f..cffc00b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -84,14 +84,21 @@ public class IntermediateFileMerger implements 
Callable<Void> {
    */
   private int totalSize;
 
-  private FileMergerParameters mergerParameters;
+  private SortParameters mergerParameters;
+
+  private File[] intermediateFiles;
+
+  private File outPutFile;
 
   /**
    * IntermediateFileMerger Constructor
    */
-  public IntermediateFileMerger(FileMergerParameters mergerParameters) {
+  public IntermediateFileMerger(SortParameters mergerParameters, File[] 
intermediateFiles,
+      File outPutFile) {
     this.mergerParameters = mergerParameters;
-    this.fileCounter = mergerParameters.getIntermediateFiles().length;
+    this.fileCounter = intermediateFiles.length;
+    this.intermediateFiles = intermediateFiles;
+    this.outPutFile = outPutFile;
   }
 
   @Override public Void call() throws Exception {
@@ -105,7 +112,7 @@ public class IntermediateFileMerger implements 
Callable<Void> {
       while (hasNext()) {
         writeDataTofile(next());
       }
-      if (mergerParameters.isCompressionEnabled() || 
mergerParameters.isPrefetch()) {
+      if (mergerParameters.isSortFileCompressionEnabled() || 
mergerParameters.isPrefetch()) {
         if (entryCount > 0) {
           if (entryCount < totalSize) {
             Object[][] temp = new Object[entryCount][];
@@ -137,7 +144,7 @@ public class IntermediateFileMerger implements 
Callable<Void> {
           LOGGER.error(e, "Problem while deleting the merge file");
         }
       } else {
-        if (mergerParameters.getOutFile().delete()) {
+        if (outPutFile.delete()) {
           LOGGER.error("Problem while deleting the merge file");
         }
       }
@@ -152,10 +159,10 @@ public class IntermediateFileMerger implements 
Callable<Void> {
    * @throws CarbonSortKeyAndGroupByException
    */
   private void initialize() throws CarbonSortKeyAndGroupByException {
-    if (!mergerParameters.isCompressionEnabled() && 
!mergerParameters.isPrefetch()) {
+    if (!mergerParameters.isSortFileCompressionEnabled() && 
!mergerParameters.isPrefetch()) {
       try {
         this.stream = new DataOutputStream(
-            new BufferedOutputStream(new 
FileOutputStream(mergerParameters.getOutFile()),
+            new BufferedOutputStream(new FileOutputStream(outPutFile),
                 mergerParameters.getFileWriteBufferSize()));
         this.stream.writeInt(this.totalNumberOfRecords);
       } catch (FileNotFoundException e) {
@@ -165,16 +172,16 @@ public class IntermediateFileMerger implements 
Callable<Void> {
       }
     } else {
       writer = TempSortFileWriterFactory.getInstance()
-          .getTempSortFileWriter(mergerParameters.isCompressionEnabled(),
+          
.getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
               mergerParameters.getDimColCount(), 
mergerParameters.getComplexDimColCount(),
               mergerParameters.getMeasureColCount(), 
mergerParameters.getNoDictionaryCount(),
               mergerParameters.getFileWriteBufferSize());
-      writer.initiaize(mergerParameters.getOutFile(), totalNumberOfRecords);
+      writer.initiaize(outPutFile, totalNumberOfRecords);
 
       if (mergerParameters.isPrefetch()) {
-        totalSize = mergerParameters.getPrefetchBufferSize();
+        totalSize = mergerParameters.getBufferSize();
       } else {
-        totalSize = mergerParameters.getNoOfRecordsInCompression();
+        totalSize = mergerParameters.getSortTempFileNoOFRecordsInCompression();
       }
     }
   }
@@ -232,20 +239,20 @@ public class IntermediateFileMerger implements 
Callable<Void> {
     LOGGER.info("Number of temp file: " + this.fileCounter);
 
     // create record holder heap
-    createRecordHolderQueue(mergerParameters.getIntermediateFiles());
+    createRecordHolderQueue(intermediateFiles);
 
     // iterate over file list and create chunk holder and add to heap
     LOGGER.info("Started adding first record from each file");
 
     SortTempFileChunkHolder sortTempFileChunkHolder = null;
 
-    for (File tempFile : mergerParameters.getIntermediateFiles()) {
+    for (File tempFile : intermediateFiles) {
       // create chunk holder
       sortTempFileChunkHolder =
           new SortTempFileChunkHolder(tempFile, 
mergerParameters.getDimColCount(),
               mergerParameters.getComplexDimColCount(), 
mergerParameters.getMeasureColCount(),
-              mergerParameters.getFileReadBufferSize(), 
mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getAggType(), 
mergerParameters.getIsNoDictionaryDimensionColumn());
+              mergerParameters.getFileBufferSize(), 
mergerParameters.getNoDictionaryCount(),
+              mergerParameters.getAggType(), 
mergerParameters.getNoDictionaryDimnesionColumn());
 
       // initialize
       sortTempFileChunkHolder.initialize();
@@ -296,7 +303,7 @@ public class IntermediateFileMerger implements 
Callable<Void> {
    * @throws CarbonSortKeyAndGroupByException problem while writing
    */
   private void writeDataTofile(Object[] row) throws 
CarbonSortKeyAndGroupByException {
-    if (mergerParameters.isCompressionEnabled() || 
mergerParameters.isPrefetch()) {
+    if (mergerParameters.isSortFileCompressionEnabled() || 
mergerParameters.isPrefetch()) {
       if (entryCount == 0) {
         records = new Object[totalSize][];
         records[entryCount++] = row;
@@ -363,7 +370,7 @@ public class IntermediateFileMerger implements 
Callable<Void> {
       }
     }
     try {
-      CarbonUtil.deleteFiles(mergerParameters.getIntermediateFiles());
+      CarbonUtil.deleteFiles(intermediateFiles);
     } catch (CarbonUtilException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while deleting the 
intermediate files");
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index cfd6d68..a55e1ba 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -25,9 +25,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -36,14 +34,9 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.schema.metadata.SortObserver;
 import 
org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
@@ -55,108 +48,18 @@ public class SortDataRows {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(SortDataRows.class.getName());
   /**
-   * lockObject
-   */
-  private final Object lockObject = new Object();
-  /**
-   * tempFileLocation
-   */
-  private String tempFileLocation;
-  /**
    * entryCount
    */
   private int entryCount;
   /**
-   * sortBufferSize
-   */
-  private int sortBufferSize;
-  /**
    * record holder array
    */
   private Object[][] recordHolderList;
   /**
-   * measure count
-   */
-  private int measureColCount;
-  /**
-   * measure count
-   */
-  private int dimColCount;
-  /**
-   * measure count
-   */
-  private int complexDimColCount;
-  /**
-   * fileBufferSize
-   */
-  private int fileBufferSize;
-  /**
-   * numberOfIntermediateFileToBeMerged
-   */
-  private int numberOfIntermediateFileToBeMerged;
-  /**
-   * executorService
-   */
-  private ExecutorService executorService;
-  /**
-   * fileWriteBufferSize
-   */
-  private int fileWriteBufferSize;
-  /**
-   * procFiles
-   */
-  private List<File> procFiles;
-  /**
-   * observer
-   */
-  private SortObserver observer;
-  /**
    * threadStatusObserver
    */
   private ThreadStatusObserver threadStatusObserver;
   /**
-   * sortTempFileNoOFRecordsInCompression
-   */
-  private int sortTempFileNoOFRecordsInCompression;
-  /**
-   * isSortTempFileCompressionEnabled
-   */
-  private boolean isSortFileCompressionEnabled;
-  /**
-   * prefetch
-   */
-  private boolean prefetch;
-  /**
-   * bufferSize
-   */
-  private int bufferSize;
-  private String databaseName;
-  private String tableName;
-
-  private char[] aggType;
-
-  /**
-   * To know how many columns are of high cardinality.
-   */
-  private int noDictionaryCount;
-  /**
-   * partitionID
-   */
-  private String partitionID;
-  /**
-   * Id of the load folder
-   */
-  private String segmentId;
-  /**
-   * task id, each spark task has a unique id
-   */
-  private String taskNo;
-
-  /**
-   * This will tell whether dimension is dictionary or not.
-   */
-  private boolean[] noDictionaryDimnesionColumn;
-  /**
    * executor service for data sort holder
    */
   private ExecutorService dataSorterAndWriterExecutorService;
@@ -165,118 +68,42 @@ public class SortDataRows {
    */
   private Semaphore semaphore;
 
-  public SortDataRows(String tableName, int dimColCount, int 
complexDimColCount,
-      int measureColCount, SortObserver observer, int noDictionaryCount, 
String partitionID,
-      String segmentId, String taskNo, boolean[] noDictionaryColMaping) {
-    // set table name
-    this.tableName = tableName;
-    this.partitionID = partitionID;
-    this.segmentId = segmentId;
-    this.taskNo = taskNo;
-    // set measure count
-    this.measureColCount = measureColCount;
+  private SortParameters parameters;
 
-    this.dimColCount = dimColCount;
+  private int sortBufferSize;
 
-    this.noDictionaryCount = noDictionaryCount;
-    this.complexDimColCount = complexDimColCount;
-    this.noDictionaryDimnesionColumn = noDictionaryColMaping;
+  private SortIntermediateFileMerger intermediateFileMerger;
 
-    // processed file list
-    this.procFiles = new 
ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+  public SortDataRows(SortParameters parameters,
+      SortIntermediateFileMerger intermediateFileMerger) {
+    this.parameters = parameters;
 
-    // observer for main sorting
-    this.observer = observer;
+    this.intermediateFileMerger = intermediateFileMerger;
 
+    this.sortBufferSize = parameters.getSortBufferSize();
     // observer of writing file in thread
     this.threadStatusObserver = new ThreadStatusObserver();
-    this.aggType = new char[measureColCount];
   }
 
   /**
    * This method will be used to initialize
    */
-  public void initialize(String databaseName, String tableName)
-      throws CarbonSortKeyAndGroupByException {
-    this.databaseName = databaseName;
-    this.tableName = tableName;
-
-    CarbonProperties carbonProperties = CarbonProperties.getInstance();
-    setSortConfiguration(carbonProperties);
+  public void initialize() throws CarbonSortKeyAndGroupByException {
 
     // create holder list which will hold incoming rows
     // size of list will be sort buffer size + 1 to avoid creation of new
     // array in list array
-    this.recordHolderList = new Object[this.sortBufferSize][];
-    updateSortTempFileLocation();
-
+    this.recordHolderList = new Object[parameters.getSortBufferSize()][];
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
 
     // create new sort temp directory
-    if (!new File(this.tempFileLocation).mkdirs()) {
+    if (!new File(parameters.getTempFileLocation()).mkdirs()) {
       LOGGER.info("Sort Temp Location Already Exists");
     }
-    int numberOfCores = 0;
-    try {
-      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-      numberOfCores = numberOfCores / 2;
-    } catch (NumberFormatException exc) {
-      numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-    }
-    this.executorService = Executors.newFixedThreadPool(numberOfCores);
-    this.dataSorterAndWriterExecutorService = 
Executors.newFixedThreadPool(numberOfCores);
-    semaphore = new Semaphore(numberOfCores);
-    this.fileWriteBufferSize = Integer.parseInt(carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
-            
CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE));
-
-    this.isSortFileCompressionEnabled = Boolean.parseBoolean(carbonProperties
-        
.getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
-            
CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
-
-    try {
-      this.sortTempFileNoOFRecordsInCompression = 
Integer.parseInt(carbonProperties
-          
.getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
-              
CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
-      if (this.sortTempFileNoOFRecordsInCompression < 1) {
-        LOGGER.error("Invalid value for: "
-            + 
CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-            + ":Only Positive Integer value(greater than zero) is 
allowed.Default value will "
-            + "be used");
-
-        this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
-            
CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.error(
-          "Invalid value for: " + 
CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-              + ", only Positive Integer value is allowed. Default value will 
be used");
-
-      this.sortTempFileNoOFRecordsInCompression = Integer
-          
.parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-    }
-
-    if (isSortFileCompressionEnabled) {
-      LOGGER.info("Compression will be used for writing the sort temp File");
-    }
-
-    prefetch = CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE;
-    bufferSize = CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE;
-
-    initAggType();
-  }
-
-  private void initAggType() {
-    Arrays.fill(aggType, 'n');
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + 
tableName);
-    List<CarbonMeasure> measures = 
carbonTable.getMeasureByTableName(tableName);
-    for (int i = 0; i < measureColCount; i++) {
-      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
-    }
+    this.dataSorterAndWriterExecutorService =
+        Executors.newFixedThreadPool(parameters.getNumberOfCores());
+    semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
   /**
@@ -292,17 +119,7 @@ public class SortDataRows {
 
     if (sortBufferSize == currentSize) {
       LOGGER.debug("************ Writing to temp file ********** ");
-
-      File[] fileList;
-      if (procFiles.size() >= numberOfIntermediateFileToBeMerged) {
-        synchronized (lockObject) {
-          fileList = procFiles.toArray(new File[procFiles.size()]);
-          this.procFiles = new ArrayList<File>(1);
-        }
-
-        LOGGER.debug("Sumitting request for intermediate merging no of files: 
" + fileList.length);
-        startIntermediateMerging(fileList);
-      }
+      intermediateFileMerger.startMergingIfPossible();
       Object[][] recordHolderListLocal = recordHolderList;
       try {
         semaphore.acquire();
@@ -334,24 +151,23 @@ public class SortDataRows {
       toSort = new Object[entryCount][];
       System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
 
-      if (noDictionaryCount > 0) {
-        Arrays.sort(toSort, new RowComparator(noDictionaryDimnesionColumn, 
noDictionaryCount));
+      if (parameters.getNoDictionaryCount() > 0) {
+        Arrays.sort(toSort, new 
RowComparator(parameters.getNoDictionaryDimnesionColumn(),
+            parameters.getNoDictionaryCount()));
       } else {
-
-        Arrays.sort(toSort, new RowComparatorForNormalDims(this.dimColCount));
+        Arrays.sort(toSort, new 
RowComparatorForNormalDims(parameters.getDimColCount()));
       }
       recordHolderList = toSort;
 
       // create new file
-      File file =
-          new File(this.tempFileLocation + File.separator + this.tableName + 
System.nanoTime() +
-              CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+      File file = new File(
+          parameters.getTempFileLocation() + File.separator + 
parameters.getTableName() +
+              System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
       writeDataTofile(recordHolderList, this.entryCount, file);
 
     }
 
     startFileBasedMerge();
-    procFiles = null;
     this.recordHolderList = null;
   }
 
@@ -363,7 +179,7 @@ public class SortDataRows {
   private void writeDataTofile(Object[][] recordHolderList, int 
entryCountLocal, File file)
       throws CarbonSortKeyAndGroupByException {
     // stream
-    if (isSortFileCompressionEnabled || prefetch) {
+    if (parameters.isSortFileCompressionEnabled() || parameters.isPrefetch()) {
       writeSortTempFile(recordHolderList, entryCountLocal, file);
       return;
     }
@@ -382,7 +198,7 @@ public class SortDataRows {
       LOGGER.error(e, "Problem while writing the sort temp file");
       throw e;
     } finally {
-      if(writer != null) {
+      if (writer != null) {
         writer.finish();
       }
     }
@@ -393,29 +209,33 @@ public class SortDataRows {
     DataOutputStream stream = null;
     try {
       // open stream
-      stream = new DataOutputStream(
-          new BufferedOutputStream(new FileOutputStream(file), 
fileWriteBufferSize));
+      stream = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(file),
+          parameters.getFileWriteBufferSize()));
 
       // write number of entries to the file
       stream.writeInt(entryCountLocal);
+      int dimColCount = parameters.getDimColCount();
+      int combinedDimCount = parameters.getNoDictionaryCount() + 
parameters.getComplexDimColCount();
+      char[] aggType = parameters.getAggType();
       Object[] row = null;
       for (int i = 0; i < entryCountLocal; i++) {
         // get row from record holder list
         row = recordHolderList[i];
         int fieldIndex = 0;
 
-        for (int dimCount = 0; dimCount < this.dimColCount; dimCount++) {
+        for (int dimCount = 0; dimCount < dimColCount; dimCount++) {
           stream.writeInt(RemoveDictionaryUtil.getDimension(fieldIndex++, 
row));
         }
 
         // if any high cardinality dims are present then write it to the file.
-        if ((this.noDictionaryCount + this.complexDimColCount) > 0) {
+
+        if (combinedDimCount > 0) {
           
stream.write(RemoveDictionaryUtil.getByteArrayForNoDictionaryCols(row));
         }
 
         // as measures are stored in separate array.
         fieldIndex = 0;
-        for (int mesCount = 0; mesCount < this.measureColCount; mesCount++) {
+        for (int mesCount = 0; mesCount < parameters.getMeasureColCount(); 
mesCount++) {
           if (null != RemoveDictionaryUtil.getMeasure(fieldIndex, row)) {
             stream.write((byte) 1);
             if (aggType[mesCount] == 
CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
@@ -447,93 +267,28 @@ public class SortDataRows {
   private TempSortFileWriter getWriter() {
     TempSortFileWriter chunkWriter = null;
     TempSortFileWriter writer = TempSortFileWriterFactory.getInstance()
-        .getTempSortFileWriter(isSortFileCompressionEnabled, dimColCount, 
complexDimColCount,
-            measureColCount, noDictionaryCount, fileWriteBufferSize);
+        .getTempSortFileWriter(parameters.isSortFileCompressionEnabled(),
+            parameters.getDimColCount(), parameters.getComplexDimColCount(),
+            parameters.getMeasureColCount(), parameters.getNoDictionaryCount(),
+            parameters.getFileWriteBufferSize());
 
-    if (prefetch && !isSortFileCompressionEnabled) {
-      chunkWriter = new SortTempFileChunkWriter(writer, bufferSize);
+    if (parameters.isPrefetch() && !parameters.isSortFileCompressionEnabled()) 
{
+      chunkWriter = new SortTempFileChunkWriter(writer, 
parameters.getBufferSize());
     } else {
-      chunkWriter = new SortTempFileChunkWriter(writer, 
sortTempFileNoOFRecordsInCompression);
+      chunkWriter =
+          new SortTempFileChunkWriter(writer, 
parameters.getSortTempFileNoOFRecordsInCompression());
     }
 
     return chunkWriter;
   }
 
   /**
-   * Below method will be used to start the intermediate file merging
-   *
-   * @param intermediateFiles
-   */
-  private void startIntermediateMerging(File[] intermediateFiles) {
-    File file = new File(this.tempFileLocation + File.separator + 
this.tableName + System.nanoTime()
-        + CarbonCommonConstants.MERGERD_EXTENSION);
-
-    FileMergerParameters parameters = new FileMergerParameters();
-    parameters.setIsNoDictionaryDimensionColumn(noDictionaryDimnesionColumn);
-    parameters.setDimColCount(dimColCount);
-    parameters.setComplexDimColCount(complexDimColCount);
-    parameters.setMeasureColCount(measureColCount);
-    parameters.setIntermediateFiles(intermediateFiles);
-    parameters.setFileReadBufferSize(fileBufferSize);
-    parameters.setFileWriteBufferSize(fileBufferSize);
-    parameters.setOutFile(file);
-    parameters.setCompressionEnabled(isSortFileCompressionEnabled);
-    
parameters.setNoOfRecordsInCompression(sortTempFileNoOFRecordsInCompression);
-    parameters.setPrefetch(prefetch);
-    parameters.setPrefetchBufferSize(bufferSize);
-    parameters.setAggType(aggType);
-    parameters.setNoDictionaryCount(noDictionaryCount);
-
-    IntermediateFileMerger merger = new IntermediateFileMerger(parameters);
-    executorService.submit(merger);
-  }
-
-  /**
-   * This method will be used to get the sort configuration
-   *
-   * @param instance
-   */
-  private void setSortConfiguration(CarbonProperties instance) {
-    // get sort buffer size
-    this.sortBufferSize = Integer.parseInt(instance
-        .getProperty(CarbonCommonConstants.SORT_SIZE, 
CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
-    LOGGER.info("Sort size for table: " + this.sortBufferSize);
-    // set number of intermedaite file to merge
-    this.numberOfIntermediateFileToBeMerged = Integer.parseInt(instance
-        .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
-            
CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE));
-
-    LOGGER.info(
-        "Number of intermediate file to be merged: " + 
this.numberOfIntermediateFileToBeMerged);
-
-    // get file buffer size
-    this.fileBufferSize = CarbonDataProcessorUtil
-        .getFileBufferSize(this.numberOfIntermediateFileToBeMerged, 
CarbonProperties.getInstance(),
-            CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-    LOGGER.info("File Buffer Size: " + this.fileBufferSize);
-  }
-
-  /**
-   * This will be used to get the sort temo location
-   *
-   */
-  private void updateSortTempFileLocation() {
-    String carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(databaseName, tableName, taskNo, 
partitionID,
-            segmentId, false);
-    this.tempFileLocation =
-        carbonDataDirectoryPath + File.separator + 
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
-    LOGGER.info("temp file location" + this.tempFileLocation);
-  }
-
-  /**
    * This method will be used to delete sort temp location is it is exites
    *
    * @throws CarbonSortKeyAndGroupByException
    */
   public void deleteSortLocationIfExists() throws 
CarbonSortKeyAndGroupByException {
-    CarbonDataProcessorUtil.deleteSortLocationIfExists(this.tempFileLocation);
+    
CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
   }
 
   /**
@@ -543,8 +298,6 @@ public class SortDataRows {
    */
   private void startFileBasedMerge() throws CarbonSortKeyAndGroupByException {
     try {
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
       dataSorterAndWriterExecutorService.shutdown();
       dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
     } catch (InterruptedException e) {
@@ -565,8 +318,8 @@ public class SortDataRows {
      */
     public void notifyFailed(Throwable exception) throws 
CarbonSortKeyAndGroupByException {
       dataSorterAndWriterExecutorService.shutdownNow();
-      executorService.shutdownNow();
-      observer.setFailed(true);
+      intermediateFileMerger.close();
+      parameters.getObserver().setFailed(true);
       LOGGER.error(exception);
       throw new CarbonSortKeyAndGroupByException(exception);
     }
@@ -586,22 +339,22 @@ public class SortDataRows {
     @Override public Void call() throws Exception {
       try {
         long startTime = System.currentTimeMillis();
-        if (noDictionaryCount > 0) {
+        if (parameters.getNoDictionaryCount() > 0) {
           Arrays.sort(recordHolderArray,
-              new RowComparator(noDictionaryDimnesionColumn, 
noDictionaryCount));
+              new RowComparator(parameters.getNoDictionaryDimnesionColumn(),
+                  parameters.getNoDictionaryCount()));
         } else {
-          Arrays.sort(recordHolderArray, new 
RowComparatorForNormalDims(dimColCount));
+          Arrays
+              .sort(recordHolderArray, new 
RowComparatorForNormalDims(parameters.getDimColCount()));
         }
         // create a new file every time
         File sortTempFile = new File(
-            tempFileLocation + File.separator + tableName + System.nanoTime()
-                + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+            parameters.getTempFileLocation() + File.separator + 
parameters.getTableName() + System
+                .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
         writeDataTofile(recordHolderArray, recordHolderArray.length, 
sortTempFile);
         // add sort temp filename to and arrayList. When the list size reaches 
20 then
         // intermediate merging of sort temp files will be triggered
-        synchronized (lockObject) {
-          procFiles.add(sortTempFile);
-        }
+        intermediateFileMerger.addFileToMerge(sortTempFile);
         LOGGER.info("Time taken to sort and write sort temp file " + 
sortTempFile + " is: " + (
             System.currentTimeMillis() - startTime));
       } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
new file mode 100644
index 0000000..9cec141
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.sortandgroupby.sortdata;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+
+/**
+ * It does mergesort intermediate files to big file.
+ */
+public class SortIntermediateFileMerger {
+
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(SortIntermediateFileMerger.class.getName());
+
+  /**
+   * executorService
+   */
+  private ExecutorService executorService;
+  /**
+   * procFiles
+   */
+  private List<File> procFiles;
+
+  private SortParameters parameters;
+
+  private final Object lockObject = new Object();
+
+  public SortIntermediateFileMerger(SortParameters parameters) {
+    this.parameters = parameters;
+    // processed file list
+    this.procFiles = new 
ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    this.executorService = 
Executors.newFixedThreadPool(parameters.getNumberOfCores());
+  }
+
+  public void addFileToMerge(File sortTempFile) {
+    // add sort temp filename to and arrayList. When the list size reaches 20 
then
+    // intermediate merging of sort temp files will be triggered
+    synchronized (lockObject) {
+      procFiles.add(sortTempFile);
+    }
+  }
+
+  public void startMergingIfPossible() {
+    File[] fileList;
+    if (procFiles.size() >= 
parameters.getNumberOfIntermediateFileToBeMerged()) {
+      synchronized (lockObject) {
+        fileList = procFiles.toArray(new File[procFiles.size()]);
+        this.procFiles = new ArrayList<File>();
+      }
+      LOGGER.debug("Sumitting request for intermediate merging no of files: " 
+ fileList.length);
+      startIntermediateMerging(fileList);
+    }
+  }
+
+  /**
+   * Below method will be used to start the intermediate file merging
+   *
+   * @param intermediateFiles
+   */
+  private void startIntermediateMerging(File[] intermediateFiles) {
+    File file = new File(
+        parameters.getTempFileLocation() + File.separator + 
parameters.getTableName() + System
+            .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
+    IntermediateFileMerger merger = new IntermediateFileMerger(parameters, 
intermediateFiles, file);
+    executorService.submit(merger);
+  }
+
+  public void finish() throws CarbonSortKeyAndGroupByException {
+    try {
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+    } catch (InterruptedException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while shutdown the 
server ", e);
+    }
+    procFiles.clear();
+    procFiles = null;
+  }
+
+  public void close() {
+    if (executorService.isShutdown()) {
+      executorService.shutdownNow();
+    }
+  }
+
+}

Reply via email to