[CARBONDATA-1839] [DataLoad] Fix bugs and optimize in compressing sort temp 
files

1.fix bugs in compressing sort temp file, use file-level compression instead of 
batch-record-level compression

2.add tests

3.update docs, add property to configure the compressor

This closes #1707


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c1002511
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c1002511
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c1002511

Branch: refs/heads/branch-1.3
Commit: c1002511af34a3c2ce0ef05f3fb8977d33d43a68
Parents: 1799642
Author: xuchuanyin <[email protected]>
Authored: Wed Dec 27 15:20:15 2017 +0800
Committer: Jacky Li <[email protected]>
Committed: Wed Jan 3 15:43:57 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  31 ++--
 .../filesystem/AbstractDFSCarbonFile.java       |  92 ++++++++---
 .../core/datastore/filesystem/CarbonFile.java   |  24 +++
 .../datastore/filesystem/LocalCarbonFile.java   | 101 +++++++-----
 .../core/datastore/impl/FileFactory.java        |  26 +++
 .../carbondata/core/util/CarbonProperties.java  |  17 ++
 .../apache/carbondata/core/util/CarbonUtil.java |  15 ++
 core/src/test/resources/sampleCSV.csv           |   1 +
 docs/useful-tips-on-carbondata.md               |   1 +
 .../TestLoadWithSortTempCompressed.scala        | 154 +++++++++++++++++
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   6 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |   5 +-
 .../loading/sort/unsafe/UnsafeSortDataRows.java |  17 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   | 151 ++++-------------
 .../merger/UnsafeIntermediateFileMerger.java    |  74 ++++-----
 .../merger/CompactionResultSortProcessor.java   |  13 +-
 .../sortdata/AbstractTempSortFileWriter.java    | 100 -----------
 .../sortdata/CompressedTempSortFileWriter.java  |  78 ---------
 .../sort/sortdata/IntermediateFileMerger.java   | 133 +++++----------
 .../SingleThreadFinalSortFilesMerger.java       |  47 +-----
 .../processing/sort/sortdata/SortDataRows.java  |  60 +------
 .../sort/sortdata/SortParameters.java           |  96 ++---------
 .../sort/sortdata/SortTempFileChunkHolder.java  | 164 ++++---------------
 .../sort/sortdata/SortTempFileChunkWriter.java  |  75 ---------
 .../sort/sortdata/TempSortFileReader.java       |  37 -----
 .../sort/sortdata/TempSortFileWriter.java       |  46 ------
 .../sortdata/TempSortFileWriterFactory.java     |  41 -----
 .../UnCompressedTempSortFileWriter.java         | 112 -------------
 28 files changed, 557 insertions(+), 1160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index fce8373..2d1e4f9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -430,26 +430,6 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE = 
"3";
   /**
-   * IS_SORT_TEMP_FILE_COMPRESSION_ENABLED
-   */
-  @CarbonProperty
-  public static final String IS_SORT_TEMP_FILE_COMPRESSION_ENABLED =
-      "carbon.is.sort.temp.file.compression.enabled";
-  /**
-   * IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE
-   */
-  public static final String 
IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE = "false";
-  /**
-   * SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-   */
-  @CarbonProperty
-  public static final String SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION =
-      "carbon.sort.temp.file.no.of.records.for.compression";
-  /**
-   * SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE
-   */
-  public static final String 
SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE = "50";
-  /**
    * DEFAULT_COLLECTION_SIZE
    */
   public static final int DEFAULT_COLLECTION_SIZE = 16;
@@ -1374,6 +1354,17 @@ public final class CarbonCommonConstants {
   public static final String CARBON_USE_MULTI_TEMP_DIR_DEFAULT = "false";
 
   /**
+   * name of compressor to compress sort temp files
+   */
+  @CarbonProperty
+  public static final String CARBON_SORT_TEMP_COMPRESSOR = 
"carbon.sort.temp.compressor";
+
+  /**
+   * The optional values are 'SNAPPY','GZIP','BZIP2','LZ4'.
+   * By default, empty means that Carbondata will not compress the sort temp 
files.
+   */
+  public static final String CARBON_SORT_TEMP_COMPRESSOR_DEFAULT = "";
+  /**
    * Which storage level to persist rdd when sort_scope=global_sort
    */
   @CarbonProperty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index fcd230a..a8513cf 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -18,10 +18,12 @@
 package org.apache.carbondata.core.datastore.filesystem;
 
 import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -40,6 +42,8 @@ import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
 
 public abstract  class AbstractDFSCarbonFile implements CarbonFile {
   /**
@@ -270,29 +274,8 @@ public abstract  class AbstractDFSCarbonFile implements 
CarbonFile {
 
   @Override public DataInputStream getDataInputStream(String path, 
FileFactory.FileType fileType,
       int bufferSize, Configuration hadoopConf) throws IOException {
-    path = path.replace("\\", "/");
-    boolean gzip = path.endsWith(".gz");
-    boolean bzip2 = path.endsWith(".bz2");
-    InputStream stream;
-    Path pt = new Path(path);
-    FileSystem fs = pt.getFileSystem(hadoopConf);
-    if (bufferSize == -1) {
-      stream = fs.open(pt);
-    } else {
-      stream = fs.open(pt, bufferSize);
-    }
-    String codecName = null;
-    if (gzip) {
-      codecName = GzipCodec.class.getName();
-    } else if (bzip2) {
-      codecName = BZip2Codec.class.getName();
-    }
-    if (null != codecName) {
-      CompressionCodecFactory ccf = new CompressionCodecFactory(hadoopConf);
-      CompressionCodec codec = ccf.getCodecByClassName(codecName);
-      stream = codec.createInputStream(stream);
-    }
-    return new DataInputStream(new BufferedInputStream(stream));
+    return getDataInputStream(path, fileType, bufferSize,
+        CarbonUtil.inferCompressorFromFileName(path));
   }
 
   /**
@@ -315,6 +298,49 @@ public abstract  class AbstractDFSCarbonFile implements 
CarbonFile {
     return new DataInputStream(new BufferedInputStream(stream));
   }
 
+  @Override public DataInputStream getDataInputStream(String path, 
FileFactory.FileType fileType,
+      int bufferSize, String compressor) throws IOException {
+    path = path.replace("\\", "/");
+    Path pt = new Path(path);
+    InputStream inputStream;
+    FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+    if (bufferSize <= 0) {
+      inputStream = fs.open(pt);
+    } else {
+      inputStream = fs.open(pt, bufferSize);
+    }
+
+    String codecName = getCodecNameFromCompressor(compressor);
+    if (!codecName.isEmpty()) {
+      CompressionCodec codec = new 
CompressionCodecFactory(hadoopConf).getCodecByName(codecName);
+      inputStream = codec.createInputStream(inputStream);
+    }
+
+    return new DataInputStream(new BufferedInputStream(inputStream));
+  }
+
+  /**
+   * get codec name from user specified compressor name
+   * @param compressorName user specified compressor name
+   * @return name of codec
+   * @throws IOException
+   */
+  private String getCodecNameFromCompressor(String compressorName) throws 
IOException {
+    if (compressorName.isEmpty()) {
+      return "";
+    } else if ("GZIP".equalsIgnoreCase(compressorName)) {
+      return GzipCodec.class.getName();
+    } else if ("BZIP2".equalsIgnoreCase(compressorName)) {
+      return BZip2Codec.class.getName();
+    } else if ("SNAPPY".equalsIgnoreCase(compressorName)) {
+      return SnappyCodec.class.getName();
+    } else if ("LZ4".equalsIgnoreCase(compressorName)) {
+      return Lz4Codec.class.getName();
+    } else {
+      throw new IOException("Unsuppotted compressor: " + compressorName);
+    }
+  }
+
   @Override public DataOutputStream getDataOutputStream(String path, 
FileFactory.FileType fileType)
       throws IOException {
     path = path.replace("\\", "/");
@@ -331,6 +357,26 @@ public abstract  class AbstractDFSCarbonFile implements 
CarbonFile {
     return fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), 
blockSize);
   }
 
+  @Override public DataOutputStream getDataOutputStream(String path, 
FileFactory.FileType fileType,
+      int bufferSize, String compressor) throws IOException {
+    path = path.replace("\\", "/");
+    Path pt = new Path(path);
+    OutputStream outputStream;
+    if (bufferSize <= 0) {
+      outputStream = fs.create(pt);
+    } else {
+      outputStream = fs.create(pt, true, bufferSize);
+    }
+
+    String codecName = getCodecNameFromCompressor(compressor);
+    if (!codecName.isEmpty()) {
+      CompressionCodec codec = new 
CompressionCodecFactory(hadoopConf).getCodecByName(codecName);
+      outputStream = codec.createOutputStream(outputStream);
+    }
+
+    return new DataOutputStream(new BufferedOutputStream(outputStream));
+  }
+
   @Override public boolean isFileExist(String filePath, FileFactory.FileType 
fileType,
       boolean performFileCheck) throws IOException {
     filePath = filePath.replace("\\", "/");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index 94f088b..16cd0e0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -77,6 +77,18 @@ public interface CarbonFile {
   DataInputStream getDataInputStream(String path, FileFactory.FileType 
fileType, int bufferSize,
       Configuration configuration) throws IOException;
 
+  /**
+   * get data input stream
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param compressor name of compressor to write this file
+   * @return dataInputStream
+   * @throws IOException
+   */
+  DataInputStream getDataInputStream(String path, FileFactory.FileType 
fileType, int bufferSize,
+      String compressor) throws IOException;
+
   DataInputStream getDataInputStream(String path, FileFactory.FileType 
fileType, int bufferSize,
       long offset) throws IOException;
 
@@ -86,6 +98,18 @@ public interface CarbonFile {
   DataOutputStream getDataOutputStream(String path, FileFactory.FileType 
fileType, int bufferSize,
       long blockSize) throws IOException;
 
+  /**
+   * get data output stream
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param compressor name of compressor to write this file
+   * @return DataOutputStream
+   * @throws IOException
+   */
+  DataOutputStream getDataOutputStream(String path, FileFactory.FileType 
fileType, int bufferSize,
+      String compressor) throws IOException;
+
   boolean isFileExist(String filePath, FileFactory.FileType fileType, boolean 
performFileCheck)
       throws IOException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 30eb7fe..39ca521 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -28,8 +28,10 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.channels.FileChannel;
 import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -37,10 +39,15 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import net.jpountz.lz4.LZ4BlockInputStream;
+import net.jpountz.lz4.LZ4BlockOutputStream;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
 
 public class LocalCarbonFile implements CarbonFile {
   private static final LogService LOGGER =
@@ -123,7 +130,7 @@ public class LocalCarbonFile implements CarbonFile {
   }
 
   public boolean renameTo(String changetoName) {
-    changetoName = getUpdatedFilePath(changetoName);
+    changetoName = FileFactory.getUpdatedFilePath(changetoName, 
FileFactory.FileType.LOCAL);
     return file.renameTo(new File(changetoName));
   }
 
@@ -231,60 +238,46 @@ public class LocalCarbonFile implements CarbonFile {
         return file.renameTo(new File(changetoName));
       }
     }
-
     return file.renameTo(new File(changetoName));
-
-  }
-
-  /**
-   * below method will be used to update the file path
-   * for local type
-   * it removes the file:/ from the path
-   *
-   * @param filePath
-   * @return updated file path without url for local
-   */
-  private static String getUpdatedFilePath(String filePath) {
-    if (filePath != null && !filePath.isEmpty()) {
-      // If the store path is relative then convert to absolute path.
-      if (filePath.startsWith("./")) {
-        try {
-          return new File(filePath).getCanonicalPath();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      } else {
-        Path pathWithoutSchemeAndAuthority =
-            Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
-        return pathWithoutSchemeAndAuthority.toString();
-      }
-    } else {
-      return filePath;
-    }
   }
 
   @Override public DataOutputStream getDataOutputStream(String path, 
FileFactory.FileType fileType,
       int bufferSize, boolean append) throws FileNotFoundException {
-    path = getUpdatedFilePath(path);
+    path = FileFactory.getUpdatedFilePath(path, FileFactory.FileType.LOCAL);
     return new DataOutputStream(
         new BufferedOutputStream(new FileOutputStream(path, append), 
bufferSize));
   }
 
   @Override public DataInputStream getDataInputStream(String path, 
FileFactory.FileType fileType,
       int bufferSize, Configuration configuration) throws IOException {
+    return getDataInputStream(path, fileType, bufferSize,
+        CarbonUtil.inferCompressorFromFileName(path));
+  }
+
+  @Override public DataInputStream getDataInputStream(String path, 
FileFactory.FileType fileType,
+      int bufferSize, String compressor) throws IOException {
     path = path.replace("\\", "/");
-    boolean gzip = path.endsWith(".gz");
-    boolean bzip2 = path.endsWith(".bz2");
-    InputStream stream;
     path = FileFactory.getUpdatedFilePath(path, fileType);
-    if (gzip) {
-      stream = new GZIPInputStream(new FileInputStream(path));
-    } else if (bzip2) {
-      stream = new BZip2CompressorInputStream(new FileInputStream(path));
+    InputStream inputStream;
+    if (compressor.isEmpty()) {
+      inputStream = new FileInputStream(path);
+    } else if ("GZIP".equalsIgnoreCase(compressor)) {
+      inputStream = new GZIPInputStream(new FileInputStream(path));
+    } else if ("BZIP2".equalsIgnoreCase(compressor)) {
+      inputStream = new BZip2CompressorInputStream(new FileInputStream(path));
+    } else if ("SNAPPY".equalsIgnoreCase(compressor)) {
+      inputStream = new SnappyInputStream(new FileInputStream(path));
+    } else if ("LZ4".equalsIgnoreCase(compressor)) {
+      inputStream = new LZ4BlockInputStream(new FileInputStream(path));
     } else {
-      stream = new FileInputStream(path);
+      throw new IOException("Unsupported compressor: " + compressor);
+    }
+
+    if (bufferSize <= 0) {
+      return new DataInputStream(new BufferedInputStream(inputStream));
+    } else {
+      return new DataInputStream(new BufferedInputStream(inputStream, 
bufferSize));
     }
-    return new DataInputStream(new BufferedInputStream(stream));
   }
 
   /**
@@ -319,7 +312,7 @@ public class LocalCarbonFile implements CarbonFile {
   @Override public DataOutputStream getDataOutputStream(String path, 
FileFactory.FileType fileType)
       throws IOException {
     path = path.replace("\\", "/");
-    path = getUpdatedFilePath(path);
+    path = FileFactory.getUpdatedFilePath(path, FileFactory.FileType.LOCAL);
     return new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(path)));
   }
 
@@ -330,6 +323,32 @@ public class LocalCarbonFile implements CarbonFile {
     return new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(path), bufferSize));
   }
 
+  @Override public DataOutputStream getDataOutputStream(String path, 
FileFactory.FileType fileType,
+      int bufferSize, String compressor) throws IOException {
+    path = path.replace("\\", "/");
+    path = FileFactory.getUpdatedFilePath(path, fileType);
+    OutputStream outputStream;
+    if (compressor.isEmpty()) {
+      outputStream = new FileOutputStream(path);
+    } else if ("GZIP".equalsIgnoreCase(compressor)) {
+      outputStream = new GZIPOutputStream(new FileOutputStream(path));
+    } else if ("BZIP2".equalsIgnoreCase(compressor)) {
+      outputStream = new BZip2CompressorOutputStream(new 
FileOutputStream(path));
+    } else if ("SNAPPY".equalsIgnoreCase(compressor)) {
+      outputStream = new SnappyOutputStream(new FileOutputStream(path));
+    } else if ("LZ4".equalsIgnoreCase(compressor)) {
+      outputStream = new LZ4BlockOutputStream(new FileOutputStream(path));
+    } else {
+      throw new IOException("Unsupported compressor: " + compressor);
+    }
+
+    if (bufferSize <= 0) {
+      return new DataOutputStream(new BufferedOutputStream(outputStream));
+    } else {
+      return new DataOutputStream(new BufferedOutputStream(outputStream, 
bufferSize));
+    }
+  }
+
   @Override public boolean isFileExist(String filePath, FileFactory.FileType 
fileType,
       boolean performFileCheck) throws IOException {
     filePath = filePath.replace("\\", "/");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 2373080..e6fbd04 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -108,6 +108,19 @@ public final class FileFactory {
   }
 
   /**
+   * get data input stream
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param compressorName name of compressor to read this file
+   * @return data input stream
+   * @throws IOException
+   */
+  public static DataInputStream getDataInputStream(String path, FileType 
fileType, int bufferSize,
+      String compressorName) throws IOException {
+    return getCarbonFile(path).getDataInputStream(path, fileType, bufferSize, 
compressorName);
+  }
+  /**
    * return the datainputStream which is seek to the offset of file
    *
    * @param path
@@ -138,6 +151,19 @@ public final class FileFactory {
   }
 
   /**
+   * get data out put stream
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param compressorName name of compressor to write this file
+   * @return data out put stram
+   * @throws IOException
+   */
+  public static DataOutputStream getDataOutputStream(String path, FileType 
fileType, int bufferSize,
+      String compressorName) throws IOException {
+    return getCarbonFile(path).getDataOutputStream(path, fileType, bufferSize, 
compressorName);
+  }
+  /**
    * This method checks the given path exists or not and also is it file or
    * not if the performFileCheck is true
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 19a3cf3..8042cfa 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1015,6 +1015,23 @@ public final class CarbonProperties {
   }
 
   /**
+   * get compressor name for compressing sort temp files
+   * @return compressor name
+   */
+  public String getSortTempCompressor() {
+    String compressor = 
getProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+        
CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT).toUpperCase();
+    if (compressor.isEmpty() || "SNAPPY".equals(compressor) || 
"GZIP".equals(compressor)
+        || "BZIP2".equals(compressor) || "LZ4".equals(compressor)) {
+      return compressor;
+    } else {
+      LOGGER.error("The 
".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR)
+          .concat(" configuration value is invalid. Only snappy,gzip,bip2,lz4 
and")
+          .concat(" empty are allowed. It will not compress the sort temp 
files by default"));
+      return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
+    }
+  }
+  /**
    * returns true if carbon property
    * @param key
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ffe4654..f87b7e8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -763,6 +763,21 @@ public final class CarbonUtil {
     return defaultFsUrl + currentPath;
   }
 
+  /**
+   * infer compress name from file name
+   * @param path file name
+   * @return compressor name
+   */
+  public static String inferCompressorFromFileName(String path) {
+    if (path.endsWith(".gz")) {
+      return "GZIP";
+    } else if (path.endsWith("bz2")) {
+      return "BZIP2";
+    } else {
+      return "";
+    }
+  }
+
   private static boolean checkIfPrefixExists(String path) {
     final String lowerPath = path.toLowerCase(Locale.getDefault());
     return lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/core/src/test/resources/sampleCSV.csv
----------------------------------------------------------------------
diff --git a/core/src/test/resources/sampleCSV.csv 
b/core/src/test/resources/sampleCSV.csv
new file mode 100644
index 0000000..79dfd50
--- /dev/null
+++ b/core/src/test/resources/sampleCSV.csv
@@ -0,0 +1 @@
+id,name
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/docs/useful-tips-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/useful-tips-on-carbondata.md 
b/docs/useful-tips-on-carbondata.md
index 0bf2940..aaf6460 100644
--- a/docs/useful-tips-on-carbondata.md
+++ b/docs/useful-tips-on-carbondata.md
@@ -168,5 +168,6 @@
   | carbon.detail.batch.size | spark/carbonlib/carbon.properties | Data 
loading | The buffer size to store records, returned from the block scan. | In 
limit scenario this parameter is very important. For example your query limit 
is 1000. But if we set this value to 3000 that means we get 3000 records from 
scan but spark will only take 1000 rows. So the 2000 remaining are useless. In 
one Finance test case after we set it to 100, in the limit 1000 scenario the 
performance increase about 2 times in comparison to if we set this value to 
12000. |
   | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | 
Whether use YARN local directories for multi-table load disk load balance | If 
this is set it to true CarbonData will use YARN local directories for 
multi-table load disk load balance, that will improve the data load 
performance. |
   | carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data 
loading | Whether to use multiple YARN local directories during table data 
loading for disk load balance | After enabling 'carbon.use.local.dir', if this 
is set to true, CarbonData will use all YARN local directories during data load 
for disk load balance, that will improve the data load performance. Please 
enable this property when you encounter disk hotspot problem during data 
loading. |
+  | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data 
loading | Specify the name of compressor to compress the intermediate sort 
temporary files during sort procedure in data loading. | The optional values 
are 'SNAPPY','GZIP','BZIP2','LZ4' and empty. By default, empty means that 
Carbondata will not compress the sort temp files. This parameter will be useful 
if you encounter disk bottleneck. |
 
   Note: If your CarbonData instance is provided only for query, you may 
specify the property 'spark.speculation=true' which is in conf directory of 
spark.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
new file mode 100644
index 0000000..61acea4
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadWithSortTempCompressed.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestLoadWithSortTempCompressed extends QueryTest
+  with BeforeAndAfterEach with BeforeAndAfterAll {
+  val originOffHeapStatus: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+  val originSortTempCompressor: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT)
+  val simpleTable = "simpleTable"
+  val complexCarbonTable = "complexCarbonTable"
+  val complexHiveTable = "complexHiveTable"
+
+  override def beforeEach(): Unit = {
+    sql(s"drop table if exists $simpleTable")
+    sql(s"drop table if exists $complexCarbonTable")
+    sql(s"drop table if exists $complexHiveTable")
+  }
+
+  override def afterEach(): Unit = {
+    sql(s"drop table if exists $simpleTable")
+    sql(s"drop table if exists $complexCarbonTable")
+    sql(s"drop table if exists $complexHiveTable")
+  }
+
+
+  override protected def beforeAll(): Unit = {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+      "SNAPPY")
+  }
+  override def afterAll(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR,
+        originSortTempCompressor)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, 
originOffHeapStatus)
+  }
+
+  private def testSimpleTable(): Unit = {
+    val lineNum: Int = 10002
+    val df = {
+      import sqlContext.implicits._
+      sqlContext.sparkContext.parallelize((1 to lineNum).reverse)
+        .map(x => (s"a$x", s"b$x", s"c$x", 12.3 + x, x, 
System.currentTimeMillis(), s"d$x"))
+        .toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7")
+    }
+
+    df.write
+      .format("carbondata")
+      .option("tableName", simpleTable)
+      .option("tempCSV", "true")
+      .option("DICTIONARY_INCLUDE", "c1,c2")
+      .option("SORT_COLUMNS", "c1,c3")
+      .save()
+
+    checkAnswer(sql(s"select count(*) from $simpleTable"), Row(lineNum))
+    checkAnswer(sql(s"select count(*) from $simpleTable where c5 > 5001"), 
Row(5001))
+  }
+
+  test("test data load for simple table with sort temp compressed with snappy" 
+
+       " and off-heap sort enabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    testSimpleTable()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for simple table with sort temp compressed with snappy" 
+
+       " and off-heap sort disabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "false")
+    testSimpleTable()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  private def testComplexTable(): Unit = {
+    // note: following tests are copied from `TestComplexTypeQuery`
+    sql(
+      s"create table $complexCarbonTable(deviceInformationId int, channelsId 
string, ROMSize " +
+      "string, ROMName String, purchasedate string, mobile struct<imei:string, 
imsi:string>, MAC " +
+      "array<string>, locationinfo array<struct<ActiveAreaId:int, 
ActiveCountry:string, " +
+      "ActiveProvince:string, Activecity:string, ActiveDistrict:string, 
ActiveStreet:string>>, " +
+      "proddate 
struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+      "double,contractNumber double)  STORED BY 'org.apache.carbondata.format' 
 TBLPROPERTIES " +
+      "('DICTIONARY_INCLUDE'='deviceInformationId', 
'DICTIONARY_EXCLUDE'='channelsId'," +
+      "'COLUMN_GROUP'='(ROMSize,ROMName)')")
+    sql(s"LOAD DATA local inpath '$resourcesPath/complextypesample.csv' INTO 
table" +
+        s" $complexCarbonTable  OPTIONS('DELIMITER'=',', " +
+        "'QUOTECHAR'='\"', 
'FILEHEADER'='deviceInformationId,channelsId,ROMSize,ROMName," +
+        
"purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber', " +
+        "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
+    sql(
+      s"create table $complexHiveTable(deviceInformationId int, channelsId 
string, ROMSize " +
+      "string, ROMName String, purchasedate string,mobile struct<imei:string, 
imsi:string>,MAC " +
+      "array<string>, locationinfo array<struct<ActiveAreaId:int, 
ActiveCountry:string, " +
+      "ActiveProvince:string, Activecity:string, ActiveDistrict:string, 
ActiveStreet:string>>, " +
+      "proddate 
struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+      "double,contractNumber double)row format delimited fields terminated by 
',' collection " +
+      "items terminated by '$' map keys terminated by ':'")
+    sql(s"LOAD DATA local inpath '$resourcesPath/complextypesample.csv' INTO 
table" +
+        s" $complexHiveTable")
+
+    checkAnswer(sql(s"select * from $complexCarbonTable"), sql(s"select * from 
$complexHiveTable"))
+    checkAnswer(sql(s"select MAC from $complexCarbonTable where MAC[0] = 
'MAC1'"),
+      sql(s"select MAC from $complexHiveTable where MAC[0] = 'MAC1'"))
+    checkAnswer(sql(s"select mobile from $complexCarbonTable where mobile.imei 
like '1AA%'"),
+      sql(s"select mobile from $complexHiveTable where mobile.imei like 
'1AA%'"))
+    checkAnswer(sql(s"select locationinfo from $complexCarbonTable" +
+                    " where locationinfo[0].ActiveAreaId > 2 AND 
locationinfo[0].ActiveAreaId < 7"),
+      sql(s"select locationinfo from $complexHiveTable" +
+          " where locationinfo[0].ActiveAreaId > 2 AND 
locationinfo[0].ActiveAreaId < 7"))
+  }
+
+  test("test data load for complex table with sort temp compressed with 
snappy" +
+       " and off-heap sort enabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    testComplexTable()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+
+  test("test data load for complex table with sort temp compressed with 
snappy" +
+       " and off-heap sort disabled") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "false")
+    testComplexTable()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      originOffHeapStatus)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index cefc97d..6432d38 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -80,11 +80,7 @@ public class ParallelReadMergeSorterImpl extends 
AbstractMergeSorter {
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     finalMerger =
         new SingleThreadFinalSortFilesMerger(dataFolderLocations, 
sortParameters.getTableName(),
-            sortParameters.getDimColCount(),
-            sortParameters.getComplexDimColCount(), 
sortParameters.getMeasureColCount(),
-            sortParameters.getNoDictionaryCount(), 
sortParameters.getMeasureDataType(),
-            sortParameters.getNoDictionaryDimnesionColumn(),
-            sortParameters.getNoDictionarySortColumn());
+            sortParameters);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 51db3a0..c7030dd 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -141,10 +141,7 @@ public class ParallelReadMergeSorterWithBucketingImpl 
extends AbstractMergeSorte
     String[] dataFolderLocation = 
CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     return new SingleThreadFinalSortFilesMerger(dataFolderLocation, 
sortParameters.getTableName(),
-            sortParameters.getDimColCount(), 
sortParameters.getComplexDimColCount(),
-            sortParameters.getMeasureColCount(), 
sortParameters.getNoDictionaryCount(),
-            sortParameters.getMeasureDataType(), 
sortParameters.getNoDictionaryDimnesionColumn(),
-            this.sortParameters.getNoDictionarySortColumn());
+            sortParameters);
   }
 
   @Override public void close() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 0210464..4dd5e44 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -17,10 +17,8 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe;
 
-import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -31,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
@@ -279,13 +278,19 @@ public class UnsafeSortDataRows {
     startFileBasedMerge();
   }
 
-  private void writeData(UnsafeCarbonRowPage rowPage, File file)
+  /**
+   * write a page to sort temp file
+   * @param rowPage page
+   * @param file file
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private void writeDataToFile(UnsafeCarbonRowPage rowPage, File file)
       throws CarbonSortKeyAndGroupByException {
     DataOutputStream stream = null;
     try {
       // open stream
-      stream = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(file),
-          parameters.getFileWriteBufferSize()));
+      stream = FileFactory.getDataOutputStream(file.getPath(), 
FileFactory.FileType.LOCAL,
+          parameters.getFileWriteBufferSize(), 
parameters.getSortTempCompressorName());
       int actualSize = rowPage.getBuffer().getActualSize();
       // write number of entries to the file
       stream.writeInt(actualSize);
@@ -372,7 +377,7 @@ public class UnsafeSortDataRows {
           File sortTempFile = new File(
               tmpDir + File.separator + parameters.getTableName()
                   + System.nanoTime() + 
CarbonCommonConstants.SORT_TEMP_FILE_EXT);
-          writeData(page, sortTempFile);
+          writeDataToFile(page, sortTempFile);
           LOGGER.info("Time taken to sort row page with size" + 
page.getBuffer().getActualSize()
               + " and write is: " + (System.currentTimeMillis() - startTime) + 
": location:"
               + sortTempFile);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 3972b1c..11b3d43 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -17,10 +17,8 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
-import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Comparator;
@@ -32,6 +30,7 @@ import java.util.concurrent.Future;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -69,27 +68,13 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
    * return row
    */
   private Object[] returnRow;
-
-  /**
-   * number of measures
-   */
-  private int measureCount;
-
-  /**
-   * number of dimensionCount
-   */
-  private int dimensionCount;
-
-  /**
-   * number of complexDimensionCount
-   */
-  private int complexDimensionCount;
-
-  /**
-   * fileBufferSize for file reader stream size
-   */
-  private int fileBufferSize;
-
+  private int dimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private DataType[] measureDataTypes;
+  private int readBufferSize;
+  private String compressorName;
   private Object[][] currentBuffer;
 
   private Object[][] backupBuffer;
@@ -109,29 +94,11 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
   private int prefetchRecordsProceesed;
 
   /**
-   * sortTempFileNoOFRecordsInCompression
-   */
-  private int sortTempFileNoOFRecordsInCompression;
-
-  /**
-   * isSortTempFileCompressionEnabled
-   */
-  private boolean isSortTempFileCompressionEnabled;
-
-  /**
    * totalRecordFetch
    */
   private int totalRecordFetch;
 
-  private int noDictionaryCount;
-
-  private DataType[] measureDataType;
-
   private int numberOfObjectRead;
-  /**
-   * to store whether dimension is of dictionary type or not
-   */
-  private boolean[] isNoDictionaryDimensionColumn;
 
   private int nullSetWordsLength;
 
@@ -143,19 +110,16 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
   public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters 
parameters) {
     // set temp file
     this.tempFile = tempFile;
+    this.dimCnt = parameters.getDimColCount();
+    this.complexCnt = parameters.getComplexDimColCount();
+    this.measureCnt = parameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = 
parameters.getNoDictionaryDimnesionColumn();
+    this.measureDataTypes = parameters.getMeasureDataType();
+    this.readBufferSize = parameters.getBufferSize();
+    this.compressorName = parameters.getSortTempCompressorName();
 
-    // set measure and dimension count
-    this.measureCount = parameters.getMeasureColCount();
-    this.dimensionCount = parameters.getDimColCount();
-    this.complexDimensionCount = parameters.getComplexDimColCount();
-
-    this.noDictionaryCount = parameters.getNoDictionaryCount();
-    // set mdkey length
-    this.fileBufferSize = parameters.getFileBufferSize();
     this.executorService = Executors.newFixedThreadPool(1);
-    this.measureDataType = parameters.getMeasureDataType();
-    this.isNoDictionaryDimensionColumn = 
parameters.getNoDictionaryDimnesionColumn();
-    this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
+    this.nullSetWordsLength = ((parameters.getMeasureColCount() - 1) >> 6) + 1;
     comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
     initialize();
   }
@@ -172,44 +136,13 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
     bufferSize = Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
             CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
-    this.isSortTempFileCompressionEnabled = 
Boolean.parseBoolean(CarbonProperties.getInstance()
-        
.getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
-            
CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
-    if (this.isSortTempFileCompressionEnabled) {
-      LOGGER.info("Compression was used while writing the sortTempFile");
-    }
-
-    try {
-      this.sortTempFileNoOFRecordsInCompression = 
Integer.parseInt(CarbonProperties.getInstance()
-          
.getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
-              
CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
-      if (this.sortTempFileNoOFRecordsInCompression < 1) {
-        LOGGER.error("Invalid value for: "
-            + 
CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-            + ": Only Positive Integer value(greater than zero) is 
allowed.Default value will"
-            + " be used");
-
-        this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
-            
CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.error(
-          "Invalid value for: " + 
CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-              + ", only Positive Integer value is allowed.Default value will 
be used");
-      this.sortTempFileNoOFRecordsInCompression = Integer
-          
.parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-    }
-
     initialise();
   }
 
   private void initialise() {
     try {
-      if (isSortTempFileCompressionEnabled) {
-        this.bufferSize = sortTempFileNoOFRecordsInCompression;
-      }
-      stream = new DataInputStream(
-          new BufferedInputStream(new FileInputStream(tempFile), 
this.fileBufferSize));
+      stream = FileFactory.getDataInputStream(tempFile.getPath(), 
FileFactory.FileType.LOCAL,
+          readBufferSize, compressorName);
       this.entryCount = stream.readInt();
       LOGGER.audit("Processing unsafe mode file rows with size : " + 
entryCount);
       if (prefetch) {
@@ -218,12 +151,7 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
         if (totalRecordFetch < this.entryCount) {
           submit = executorService.submit(new DataFetcher(true));
         }
-      } else {
-        if (isSortTempFileCompressionEnabled) {
-          new DataFetcher(false).call();
-        }
       }
-
     } catch (FileNotFoundException e) {
       LOGGER.error(e);
       throw new RuntimeException(tempFile + " No Found", e);
@@ -244,19 +172,6 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
   public void readRow() throws CarbonSortKeyAndGroupByException {
     if (prefetch) {
       fillDataForPrefetch();
-    } else if (isSortTempFileCompressionEnabled) {
-      if (bufferRowCounter >= bufferSize) {
-        try {
-          new DataFetcher(false).call();
-          bufferRowCounter = 0;
-        } catch (Exception e) {
-          LOGGER.error(e);
-          throw new CarbonSortKeyAndGroupByException(tempFile + " Problem 
while reading", e);
-        }
-
-      }
-      prefetchRecordsProceesed++;
-      returnRow = currentBuffer[bufferRowCounter++];
     } else {
       this.returnRow = getRowFromStream();
     }
@@ -296,7 +211,7 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
    * @throws CarbonSortKeyAndGroupByException
    */
   private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
-    Object[] row = new Object[dimensionCount + measureCount];
+    Object[] row = new Object[dimCnt + measureCnt];
     try {
       int dimCount = 0;
       for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
@@ -312,7 +227,7 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
       }
 
       // write complex dimensions here.
-      for (; dimCount < dimensionCount; dimCount++) {
+      for (; dimCount < dimCnt; dimCount++) {
         short aShort = stream.readShort();
         byte[] col = new byte[aShort];
         stream.readFully(col);
@@ -324,25 +239,24 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
         words[i] = stream.readLong();
       }
 
-      for (int mesCount = 0; mesCount < measureCount; mesCount++) {
+      for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
         if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
-          DataType dataType = measureDataType[mesCount];
+          DataType dataType = measureDataTypes[mesCount];
           if (dataType == DataTypes.SHORT) {
-            row[dimensionCount + mesCount] = stream.readShort();
+            row[dimCount + mesCount] = stream.readShort();
           } else if (dataType == DataTypes.INT) {
-            row[dimensionCount + mesCount] = stream.readInt();
+            row[dimCount + mesCount] = stream.readInt();
           } else if (dataType == DataTypes.LONG) {
-            row[dimensionCount + mesCount] = stream.readLong();
+            row[dimCount + mesCount] = stream.readLong();
           } else if (dataType == DataTypes.DOUBLE) {
-            row[dimensionCount + mesCount] = stream.readDouble();
+            row[dimCount + mesCount] = stream.readDouble();
           } else if (DataTypes.isDecimal(dataType)) {
             short aShort = stream.readShort();
             byte[] bigDecimalInBytes = new byte[aShort];
             stream.readFully(bigDecimalInBytes);
-            row[dimensionCount + mesCount] = 
DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+            row[dimCount + mesCount] = 
DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
           } else {
-            throw new IllegalArgumentException(
-                "unsupported data type:" + measureDataType[mesCount]);
+            throw new IllegalArgumentException("unsupported data type:" + 
dataType);
           }
         }
       }
@@ -368,7 +282,7 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
    * @return more row present in file
    */
   public boolean hasNext() {
-    if (prefetch || isSortTempFileCompressionEnabled) {
+    if (prefetch) {
       return this.prefetchRecordsProceesed < this.entryCount;
     }
     return this.numberOfObjectRead < this.entryCount;
@@ -412,10 +326,9 @@ public class UnsafeSortTempFileChunkHolder implements 
SortTempChunkHolder {
 
   @Override public int hashCode() {
     int hash = 0;
-    hash += 31 * measureCount;
-    hash += 31 * dimensionCount;
-    hash += 31 * complexDimensionCount;
-    hash += 31 * noDictionaryCount;
+    hash += 31 * measureCnt;
+    hash += 31 * dimCnt;
+    hash += 31 * complexCnt;
     hash += tempFile.hashCode();
     return hash;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 7328899..4bbf61b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -17,11 +17,9 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe.merger;
 
-import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -32,6 +30,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -41,8 +40,6 @@ import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunk
 import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
 import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TempSortFileWriter;
-import 
org.apache.carbondata.processing.sort.sortdata.TempSortFileWriterFactory;
 
 public class UnsafeIntermediateFileMerger implements Callable<Void> {
   /**
@@ -71,18 +68,19 @@ public class UnsafeIntermediateFileMerger implements 
Callable<Void> {
    */
   private int totalNumberOfRecords;
 
-  /**
-   * writer
-   */
-  private TempSortFileWriter writer;
-
   private SortParameters mergerParameters;
 
   private File[] intermediateFiles;
 
   private File outPutFile;
 
-  private boolean[] noDictionarycolumnMapping;
+  private int dimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private DataType[] measureDataTypes;
+  private int writeBufferSize;
+  private String compressorName;
 
   private long[] nullSetWords;
 
@@ -99,8 +97,14 @@ public class UnsafeIntermediateFileMerger implements 
Callable<Void> {
     this.fileCounter = intermediateFiles.length;
     this.intermediateFiles = intermediateFiles;
     this.outPutFile = outPutFile;
-    noDictionarycolumnMapping = 
mergerParameters.getNoDictionaryDimnesionColumn();
-    this.nullSetWords = new long[((mergerParameters.getMeasureColCount() - 1) 
>> 6) + 1];
+    this.dimCnt = mergerParameters.getDimColCount();
+    this.complexCnt = mergerParameters.getComplexDimColCount();
+    this.measureCnt = mergerParameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = 
mergerParameters.getNoDictionaryDimnesionColumn();
+    this.measureDataTypes = mergerParameters.getMeasureDataType();
+    this.writeBufferSize = mergerParameters.getBufferSize();
+    this.compressorName = mergerParameters.getSortTempCompressorName();
+    this.nullSetWords = new long[((measureCnt - 1) >> 6) + 1];
     // Take size of 2 MB for each row. I think it is high enough to use
     rowData = ByteBuffer.allocate(2 * 1024 * 1024);
   }
@@ -112,7 +116,7 @@ public class UnsafeIntermediateFileMerger implements 
Callable<Void> {
       startSorting();
       initialize();
       while (hasNext()) {
-        writeDataTofile(next());
+        writeDataToFile(next());
       }
       double intermediateMergeCostTime =
           (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
@@ -124,9 +128,6 @@ public class UnsafeIntermediateFileMerger implements 
Callable<Void> {
       throwable = e;
     } finally {
       CarbonUtil.closeStreams(this.stream);
-      if (null != writer) {
-        writer.finish();
-      }
       if (null == throwable) {
         try {
           finish();
@@ -152,24 +153,14 @@ public class UnsafeIntermediateFileMerger implements 
Callable<Void> {
    * @throws CarbonSortKeyAndGroupByException
    */
   private void initialize() throws CarbonSortKeyAndGroupByException {
-    if (!mergerParameters.isSortFileCompressionEnabled() && 
!mergerParameters.isPrefetch()) {
-      try {
-        this.stream = new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(outPutFile),
-                mergerParameters.getFileWriteBufferSize()));
-        this.stream.writeInt(this.totalNumberOfRecords);
-      } catch (FileNotFoundException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while getting the 
file", e);
-      } catch (IOException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while writing the 
data to file", e);
-      }
-    } else {
-      writer = TempSortFileWriterFactory.getInstance()
-          
.getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
-              mergerParameters.getDimColCount(), 
mergerParameters.getComplexDimColCount(),
-              mergerParameters.getMeasureColCount(), 
mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getFileWriteBufferSize());
-      writer.initiaize(outPutFile, totalNumberOfRecords);
+    try {
+      stream = FileFactory.getDataOutputStream(outPutFile.getPath(), 
FileFactory.FileType.LOCAL,
+          writeBufferSize, compressorName);
+      this.stream.writeInt(this.totalNumberOfRecords);
+    } catch (FileNotFoundException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while getting the 
file", e);
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while writing the 
data to file", e);
     }
   }
 
@@ -283,12 +274,11 @@ public class UnsafeIntermediateFileMerger implements 
Callable<Void> {
    *
    * @throws CarbonSortKeyAndGroupByException problem while writing
    */
-  private void writeDataTofile(Object[] row) throws 
CarbonSortKeyAndGroupByException, IOException {
+  private void writeDataToFile(Object[] row) throws 
CarbonSortKeyAndGroupByException, IOException {
     int dimCount = 0;
     int size = 0;
-    DataType[] type = mergerParameters.getMeasureDataType();
-    for (; dimCount < noDictionarycolumnMapping.length; dimCount++) {
-      if (noDictionarycolumnMapping[dimCount]) {
+    for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+      if (isNoDictionaryDimensionColumn[dimCount]) {
         byte[] col = (byte[]) row[dimCount];
         rowData.putShort((short) col.length);
         size += 2;
@@ -301,9 +291,7 @@ public class UnsafeIntermediateFileMerger implements 
Callable<Void> {
     }
 
     // write complex dimensions here.
-    int dimensionSize =
-        mergerParameters.getDimColCount() + 
mergerParameters.getComplexDimColCount();
-    int measureSize = mergerParameters.getMeasureColCount();
+    int dimensionSize = dimCnt + complexCnt;
     for (; dimCount < dimensionSize; dimCount++) {
       byte[] col = (byte[]) row[dimCount];
       rowData.putShort((short)col.length);
@@ -315,10 +303,10 @@ public class UnsafeIntermediateFileMerger implements 
Callable<Void> {
     int nullSetSize = nullSetWords.length * 8;
     int nullLoc = size;
     size += nullSetSize;
-    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
+    for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
-        DataType dataType = type[mesCount];
+        DataType dataType = measureDataTypes[mesCount];
         if (dataType == DataTypes.SHORT) {
           rowData.putShort(size, (Short) value);
           size += 2;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index eece8f2..de3572e 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -130,7 +130,7 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
   private SortIntermediateFileMerger intermediateFileMerger;
 
   private List<String> partitionNames;
-
+  private SortParameters sortParameters;
 
   public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, 
CarbonTable carbonTable,
       SegmentProperties segmentProperties, CompactionType compactionType, 
String tableName,
@@ -349,11 +349,11 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
       noDictionaryCount++;
     }
     dimensionColumnCount = dimensions.size();
-    SortParameters parameters = createSortParameters();
-    intermediateFileMerger = new SortIntermediateFileMerger(parameters);
+    sortParameters = createSortParameters();
+    intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
     // TODO: Now it is only supported onheap merge, but we can have unsafe 
merge
     // as well by using UnsafeSortDataRows.
-    this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger);
+    this.sortDataRows = new SortDataRows(sortParameters, 
intermediateFileMerger);
     try {
       this.sortDataRows.initialize();
     } catch (CarbonSortKeyAndGroupByException e) {
@@ -389,13 +389,12 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
       System.arraycopy(noDictionaryColMapping, 0,
           noDictionarySortColumnMapping, 0, 
noDictionarySortColumnMapping.length);
     }
+    sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
 
     String[] sortTempFileLocation = 
CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
         CarbonCommonConstants.FILE_SEPARATOR, 
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     finalMerger =
-        new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, 
dimensionColumnCount,
-            segmentProperties.getComplexDimensions().size(), measureCount, 
noDictionaryCount,
-            dataTypes, noDictionaryColMapping, noDictionarySortColumnMapping);
+        new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, 
sortParameters);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
deleted file mode 100644
index 1302a5b..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.util.CarbonUtil;
-import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-
-public abstract class AbstractTempSortFileWriter implements TempSortFileWriter 
{
-
-  /**
-   * writeFileBufferSize
-   */
-  protected int writeBufferSize;
-
-  /**
-   * Measure count
-   */
-  protected int measureCount;
-
-  /**
-   * Measure count
-   */
-  protected int dimensionCount;
-
-  /**
-   * complexDimension count
-   */
-  protected int complexDimensionCount;
-
-  /**
-   * stream
-   */
-  protected DataOutputStream stream;
-
-  /**
-   * noDictionaryCount
-   */
-  protected int noDictionaryCount;
-
-  /**
-   * AbstractTempSortFileWriter
-   *
-   * @param writeBufferSize
-   * @param dimensionCount
-   * @param measureCount
-   */
-  public AbstractTempSortFileWriter(int dimensionCount, int 
complexDimensionCount, int measureCount,
-      int noDictionaryCount, int writeBufferSize) {
-    this.writeBufferSize = writeBufferSize;
-    this.dimensionCount = dimensionCount;
-    this.complexDimensionCount = complexDimensionCount;
-    this.measureCount = measureCount;
-    this.noDictionaryCount = noDictionaryCount;
-  }
-
-  /**
-   * Below method will be used to initialize the stream and write the entry 
count
-   */
-  @Override public void initiaize(File file, int entryCount)
-      throws CarbonSortKeyAndGroupByException {
-    try {
-      stream = new DataOutputStream(
-          new BufferedOutputStream(new FileOutputStream(file), 
writeBufferSize));
-      stream.writeInt(entryCount);
-    } catch (FileNotFoundException e1) {
-      throw new CarbonSortKeyAndGroupByException(e1);
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException(e);
-    }
-  }
-
-  /**
-   * Below method will be used to close the stream
-   */
-  @Override public void finish() {
-    CarbonUtil.closeStreams(stream);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
deleted file mode 100644
index 40f650d..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-
-public class CompressedTempSortFileWriter extends AbstractTempSortFileWriter {
-
-  /**
-   * CompressedTempSortFileWriter
-   *
-   * @param writeBufferSize
-   * @param dimensionCount
-   * @param measureCount
-   */
-  public CompressedTempSortFileWriter(int dimensionCount, int 
complexDimensionCount,
-      int measureCount, int noDictionaryCount, int writeBufferSize) {
-    super(dimensionCount, complexDimensionCount, measureCount, 
noDictionaryCount, writeBufferSize);
-  }
-
-  /**
-   * Below method will be used to write the sort temp file
-   *
-   * @param records
-   */
-  public void writeSortTempFile(Object[][] records) throws 
CarbonSortKeyAndGroupByException {
-    DataOutputStream dataOutputStream = null;
-    ByteArrayOutputStream blockDataArray = null;
-    int totalSize = 0;
-    int recordSize = 0;
-    try {
-      recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) 
+ (dimensionCount
-          * CarbonCommonConstants.INT_SIZE_IN_BYTE);
-      totalSize = records.length * recordSize;
-
-      blockDataArray = new ByteArrayOutputStream(totalSize);
-      dataOutputStream = new DataOutputStream(blockDataArray);
-
-      UnCompressedTempSortFileWriter
-          .writeDataOutputStream(records, dataOutputStream, measureCount, 
dimensionCount,
-              noDictionaryCount, complexDimensionCount);
-
-      stream.writeInt(records.length);
-      byte[] byteArray = CompressorFactory.getInstance().getCompressor()
-          .compressByte(blockDataArray.toByteArray());
-      stream.writeInt(byteArray.length);
-      stream.write(byteArray);
-
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException(e);
-    } finally {
-      CarbonUtil.closeStreams(blockDataArray);
-      CarbonUtil.closeStreams(dataOutputStream);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index bc65026..04efa1f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -17,11 +17,9 @@
 
 package org.apache.carbondata.processing.sort.sortdata;
 
-import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.AbstractQueue;
@@ -30,6 +28,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -64,33 +63,19 @@ public class IntermediateFileMerger implements 
Callable<Void> {
    */
   private int totalNumberOfRecords;
 
-  /**
-   * records
-   */
-  private Object[][] records;
-
-  /**
-   * entryCount
-   */
-  private int entryCount;
-
-  /**
-   * writer
-   */
-  private TempSortFileWriter writer;
-
-  /**
-   * totalSize
-   */
-  private int totalSize;
-
   private SortParameters mergerParameters;
 
   private File[] intermediateFiles;
 
   private File outPutFile;
-
-  private boolean[] noDictionarycolumnMapping;
+  private int dimCnt;
+  private int noDictDimCnt;
+  private int complexCnt;
+  private int measureCnt;
+  private boolean[] isNoDictionaryDimensionColumn;
+  private DataType[] measureDataTypes;
+  private int writeBufferSize;
+  private String compressorName;
 
   private Throwable throwable;
 
@@ -103,7 +88,14 @@ public class IntermediateFileMerger implements 
Callable<Void> {
     this.fileCounter = intermediateFiles.length;
     this.intermediateFiles = intermediateFiles;
     this.outPutFile = outPutFile;
-    noDictionarycolumnMapping = 
mergerParameters.getNoDictionaryDimnesionColumn();
+    this.dimCnt = mergerParameters.getDimColCount();
+    this.noDictDimCnt = mergerParameters.getNoDictionaryCount();
+    this.complexCnt = mergerParameters.getComplexDimColCount();
+    this.measureCnt = mergerParameters.getMeasureColCount();
+    this.isNoDictionaryDimensionColumn = 
mergerParameters.getNoDictionaryDimnesionColumn();
+    this.measureDataTypes = mergerParameters.getMeasureDataType();
+    this.writeBufferSize = mergerParameters.getBufferSize();
+    this.compressorName = mergerParameters.getSortTempCompressorName();
   }
 
   @Override public Void call() throws Exception {
@@ -113,19 +105,7 @@ public class IntermediateFileMerger implements 
Callable<Void> {
       startSorting();
       initialize();
       while (hasNext()) {
-        writeDataTofile(next());
-      }
-      if (mergerParameters.isSortFileCompressionEnabled() || 
mergerParameters.isPrefetch()) {
-        if (entryCount > 0) {
-          if (entryCount < totalSize) {
-            Object[][] temp = new Object[entryCount][];
-            System.arraycopy(records, 0, temp, 0, entryCount);
-            records = temp;
-            this.writer.writeSortTempFile(temp);
-          } else {
-            this.writer.writeSortTempFile(records);
-          }
-        }
+        writeDataToFile(next());
       }
       double intermediateMergeCostTime =
           (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
@@ -136,11 +116,7 @@ public class IntermediateFileMerger implements 
Callable<Void> {
       clear();
       throwable = e;
     } finally {
-      records = null;
       CarbonUtil.closeStreams(this.stream);
-      if (null != writer) {
-        writer.finish();
-      }
       if (null == throwable) {
         try {
           finish();
@@ -166,30 +142,14 @@ public class IntermediateFileMerger implements 
Callable<Void> {
    * @throws CarbonSortKeyAndGroupByException
    */
   private void initialize() throws CarbonSortKeyAndGroupByException {
-    if (!mergerParameters.isSortFileCompressionEnabled() && 
!mergerParameters.isPrefetch()) {
-      try {
-        this.stream = new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(outPutFile),
-                mergerParameters.getFileWriteBufferSize()));
-        this.stream.writeInt(this.totalNumberOfRecords);
-      } catch (FileNotFoundException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while getting the 
file", e);
-      } catch (IOException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while writing the 
data to file", e);
-      }
-    } else {
-      writer = TempSortFileWriterFactory.getInstance()
-          
.getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
-              mergerParameters.getDimColCount(), 
mergerParameters.getComplexDimColCount(),
-              mergerParameters.getMeasureColCount(), 
mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getFileWriteBufferSize());
-      writer.initiaize(outPutFile, totalNumberOfRecords);
-
-      if (mergerParameters.isPrefetch()) {
-        totalSize = mergerParameters.getBufferSize();
-      } else {
-        totalSize = mergerParameters.getSortTempFileNoOFRecordsInCompression();
-      }
+    try {
+      stream = FileFactory.getDataOutputStream(outPutFile.getPath(), 
FileFactory.FileType.LOCAL,
+          writeBufferSize, compressorName);
+      this.stream.writeInt(this.totalNumberOfRecords);
+    } catch (FileNotFoundException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while getting the 
file", e);
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while writing the 
data to file", e);
     }
   }
 
@@ -256,12 +216,7 @@ public class IntermediateFileMerger implements 
Callable<Void> {
     for (File tempFile : intermediateFiles) {
       // create chunk holder
       sortTempFileChunkHolder =
-          new SortTempFileChunkHolder(tempFile, 
mergerParameters.getDimColCount(),
-              mergerParameters.getComplexDimColCount(), 
mergerParameters.getMeasureColCount(),
-              mergerParameters.getFileBufferSize(), 
mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getMeasureDataType(),
-              mergerParameters.getNoDictionaryDimnesionColumn(),
-              mergerParameters.getNoDictionarySortColumn(), 
mergerParameters.getTableName());
+          new SortTempFileChunkHolder(tempFile, mergerParameters, 
mergerParameters.getTableName());
 
       // initialize
       sortTempFileChunkHolder.initialize();
@@ -311,30 +266,14 @@ public class IntermediateFileMerger implements 
Callable<Void> {
    *
    * @throws CarbonSortKeyAndGroupByException problem while writing
    */
-  private void writeDataTofile(Object[] row) throws 
CarbonSortKeyAndGroupByException {
-    if (mergerParameters.isSortFileCompressionEnabled() || 
mergerParameters.isPrefetch()) {
-      if (entryCount == 0) {
-        records = new Object[totalSize][];
-        records[entryCount++] = row;
-        return;
-      }
-
-      records[entryCount++] = row;
-      if (entryCount == totalSize) {
-        this.writer.writeSortTempFile(records);
-        entryCount = 0;
-        records = new Object[totalSize][];
-      }
-      return;
-    }
+  private void writeDataToFile(Object[] row) throws 
CarbonSortKeyAndGroupByException {
     try {
-      DataType[] measureDataType = mergerParameters.getMeasureDataType();
       int[] mdkArray = (int[]) row[0];
       byte[][] nonDictArray = (byte[][]) row[1];
       int mdkIndex = 0;
       int nonDictKeyIndex = 0;
       // write dictionary and non dictionary dimensions here.
-      for (boolean nodictinary : noDictionarycolumnMapping) {
+      for (boolean nodictinary : isNoDictionaryDimensionColumn) {
         if (nodictinary) {
           byte[] col = nonDictArray[nonDictKeyIndex++];
           stream.writeShort(col.length);
@@ -343,12 +282,18 @@ public class IntermediateFileMerger implements 
Callable<Void> {
           stream.writeInt(mdkArray[mdkIndex++]);
         }
       }
-
+      // write complex
+      for (; nonDictKeyIndex < noDictDimCnt + complexCnt; nonDictKeyIndex++) {
+        byte[] col = nonDictArray[nonDictKeyIndex++];
+        stream.writeShort(col.length);
+        stream.write(col);
+      }
+      // write measure
       int fieldIndex = 0;
-      for (int counter = 0; counter < mergerParameters.getMeasureColCount(); 
counter++) {
+      for (int counter = 0; counter < measureCnt; counter++) {
         if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
           stream.write((byte) 1);
-          DataType dataType = measureDataType[counter];
+          DataType dataType = measureDataTypes[counter];
           if (dataType == DataTypes.BOOLEAN) {
             
stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row));
           } else if (dataType == DataTypes.SHORT) {
@@ -365,7 +310,7 @@ public class IntermediateFileMerger implements 
Callable<Void> {
             stream.writeInt(bigDecimalInBytes.length);
             stream.write(bigDecimalInBytes);
           } else {
-            throw new IllegalArgumentException("unsupported data type:" + 
measureDataType[counter]);
+            throw new IllegalArgumentException("unsupported data type:" + 
dataType);
           }
         } else {
           stream.write((byte) 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index db4c771..88695b9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -36,7 +36,6 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -74,39 +73,10 @@ public class SingleThreadFinalSortFilesMerger extends 
CarbonIterator<Object[]> {
   private String tableName;
 
   /**
-   * measureCount
-   */
-  private int measureCount;
-
-  /**
-   * dimensionCount
-   */
-  private int dimensionCount;
-
-  /**
-   * measure count
-   */
-  private int noDictionaryCount;
-
-  /**
-   * complexDimensionCount
-   */
-  private int complexDimensionCount;
-
-  /**
    * tempFileLocation
    */
   private String[] tempFileLocation;
-
-  private DataType[] measureDataType;
-
-  /**
-   * below code is to check whether dimension
-   * is of no dictionary type or not
-   */
-  private boolean[] isNoDictionaryColumn;
-
-  private boolean[] isNoDictionarySortColumn;
+  private SortParameters sortParameters;
 
   private int maxThreadForSorting;
 
@@ -115,17 +85,10 @@ public class SingleThreadFinalSortFilesMerger extends 
CarbonIterator<Object[]> {
   private List<Future<Void>> mergerTask;
 
   public SingleThreadFinalSortFilesMerger(String[] tempFileLocation, String 
tableName,
-      int dimensionCount, int complexDimensionCount, int measureCount, int 
noDictionaryCount,
-      DataType[] type, boolean[] isNoDictionaryColumn, boolean[] 
isNoDictionarySortColumn) {
+      SortParameters sortParameters) {
     this.tempFileLocation = tempFileLocation;
     this.tableName = tableName;
-    this.dimensionCount = dimensionCount;
-    this.complexDimensionCount = complexDimensionCount;
-    this.measureCount = measureCount;
-    this.measureDataType = type;
-    this.noDictionaryCount = noDictionaryCount;
-    this.isNoDictionaryColumn = isNoDictionaryColumn;
-    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
+    this.sortParameters = sortParameters;
     try {
       maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
@@ -211,9 +174,7 @@ public class SingleThreadFinalSortFilesMerger extends 
CarbonIterator<Object[]> {
         @Override public Void call() throws CarbonSortKeyAndGroupByException {
             // create chunk holder
             SortTempFileChunkHolder sortTempFileChunkHolder =
-                new SortTempFileChunkHolder(tempFile, dimensionCount, 
complexDimensionCount,
-                    measureCount, fileBufferSize, noDictionaryCount, 
measureDataType,
-                    isNoDictionaryColumn, isNoDictionarySortColumn, tableName);
+                new SortTempFileChunkHolder(tempFile, sortParameters, 
tableName);
           try {
             // initialize
             sortTempFileChunkHolder.initialize();

Reply via email to