Repository: carbondata
Updated Branches:
  refs/heads/master 57c54fb7f -> b439b00f6


[CARBONDATA-2238][DataLoad] Merge and spill in-memory pages if memory is not 
enough

Currently in carbondata, pages will be added to memory. If memory is not 
enough, newly incoming pages will be spilled to disk directly. This 
implementation will merge&spill the in-memory pages and make room for the newly 
incoming pages.

As a result, carbondata will spill less than before and spill bigger files 
instead of smaller files and the merge&sort of the pages is in-memory instead 
of spilled-file.

This closes #2056


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b439b00f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b439b00f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b439b00f

Branch: refs/heads/master
Commit: b439b00f6afafedc7a73daa24c799c69c8cd5574
Parents: 57c54fb
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Wed Mar 14 14:31:43 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Sun Apr 8 14:55:46 2018 +0800

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  12 +++
 .../core/memory/UnsafeSortMemoryManager.java    |   8 ++
 .../dataload/TestLoadDataWithUnsafeMemory.scala | 106 +++++++++++++++++++
 .../UnsafeBatchParallelReadMergeSorterImpl.java |   5 +-
 .../impl/UnsafeParallelReadMergeSorterImpl.java |   5 +-
 ...allelReadMergeSorterWithColumnRangeImpl.java |   3 +-
 .../loading/sort/unsafe/UnsafeSortDataRows.java |  62 +++++------
 .../UnsafeInMemoryIntermediateDataMerger.java   |  82 ++++++++++++--
 .../unsafe/merger/UnsafeIntermediateMerger.java |  73 +++++++++----
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  15 ++-
 10 files changed, 306 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b439b00f/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 823f568..a2213d5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -139,4 +139,16 @@ public final class CarbonLoadOptionConstants {
   public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS
       = "carbon.load.directWriteHdfs.enabled";
   public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT = 
"false";
+
+  /**
+   * If the sort memory is insufficient, spill inmemory pages to disk.
+   * The total amount of pages is at most the specified percentage of total 
sort memory. Default
+   * value 0 means that no pages will be spilled and the newly incoming pages 
will be spilled,
+   * whereas value 1 means that all pages will be spilled and newly incoming 
pages will be loaded
+   * into sort memory.
+   */
+  @CarbonProperty
+  public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE
+      = "carbon.load.sortMemory.spill.percentage";
+  public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT 
= "0";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b439b00f/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
index 67bb6cc..d8d4f81 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java
@@ -120,6 +120,14 @@ public class UnsafeSortMemoryManager {
   }
 
   /**
+   * total usable memory for sort memory manager
+   * @return size in bytes
+   */
+  public long getUsableMemory() {
+    return totalMemory;
+  }
+
+  /**
    * Below method will be used to allocate dummy memory
    * this will be used to allocate first and then used when u need
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b439b00f/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithUnsafeMemory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithUnsafeMemory.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithUnsafeMemory.scala
new file mode 100644
index 0000000..c08ec52
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithUnsafeMemory.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Ignore}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * test data load with unsafe memory.
+ * The CI env may not have so much memory, so disable this test case for now.
+ * Ps: seen from CI result, the sdvTests works fine
+ */
+@Ignore
+class TestLoadDataWithUnsafeMemory extends QueryTest
+  with BeforeAndAfterEach with BeforeAndAfterAll {
+  val originUnsafeSortStatus: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+  val originUnsafeMemForSort: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
+      CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT)
+  val originUnsafeMemForWorking: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
+      CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)
+  val originUnsafeSizeForChunk: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB,
+      CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT)
+  val targetTable = "table_unsafe_memory"
+
+
+  override def beforeEach(): Unit = {
+    sql(s"drop table if exists $targetTable ")
+  }
+
+  override def afterEach(): Unit = {
+    sql(s"drop table if exists $targetTable ")
+  }
+
+  override protected def beforeAll(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB, "1024")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "512")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB, "512")
+  }
+
+  override def afterAll(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, 
originUnsafeSortStatus)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB, 
originUnsafeMemForSort)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, 
originUnsafeMemForWorking)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB, 
originUnsafeSizeForChunk)
+  }
+
+  private def testSimpleTable(): Unit = {
+    // This number is chosen to reproduce issue CARBONDATA-2246. It was choose 
on purpose that the
+    // records in memory will consume about two more unsafe-row-pages and the 
last one will exhaust
+    // the working memory.
+    val lineNum: Int = 70002
+    val df = {
+      import sqlContext.implicits._
+      sqlContext.sparkContext.parallelize((1 to lineNum).reverse)
+        .map(x => (s"a$x", s"b$x", s"c$x", 12.3 + x, x, 
System.currentTimeMillis(), s"d$x"))
+        .toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7")
+    }
+
+    df.write
+      .format("carbondata")
+      .option("tableName", targetTable)
+      .option("SORT_COLUMNS", "c1,c3")
+      .save()
+
+    checkAnswer(sql(s"select count(*) from $targetTable"), Row(lineNum))
+    checkAnswer(sql(s"select count(*) from $targetTable where c5 > 5000"), 
Row(lineNum - 5000))
+  }
+
+  // see CARBONDATA-2246
+  test("unsafe sort with chunk size equal to working memory") {
+    testSimpleTable()
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b439b00f/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index ed3a55d..80887c1 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
@@ -212,7 +211,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends 
AbstractMergeSorter
 
       try {
         sortDataRow.initialize();
-      } catch (MemoryException e) {
+      } catch (Exception e) {
         throw new CarbonDataLoadingException(e);
       }
       batchCount++;
@@ -328,7 +327,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends 
AbstractMergeSorter
             .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
                 System.currentTimeMillis());
         return false;
-      } catch (InterruptedException e) {
+      } catch (Exception e) {
         throw new CarbonDataLoadingException(e);
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b439b00f/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index c05c027..9ff1b22 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
@@ -82,7 +81,7 @@ public class UnsafeParallelReadMergeSorterImpl extends 
AbstractMergeSorter {
     final int batchSize = CarbonProperties.getInstance().getBatchSize();
     try {
       sortDataRow.initialize();
-    } catch (MemoryException e) {
+    } catch (Exception e) {
       throw new CarbonDataLoadingException(e);
     }
     this.executorService = Executors.newFixedThreadPool(iterators.length,
@@ -166,7 +165,7 @@ public class UnsafeParallelReadMergeSorterImpl extends 
AbstractMergeSorter {
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           .recordDictionaryValuesTotalTime(parameters.getPartitionID(), 
System.currentTimeMillis());
       return false;
-    } catch (InterruptedException e) {
+    } catch (Exception e) {
       throw new CarbonDataLoadingException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b439b00f/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
index 99d6627..7c37b88 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
@@ -31,7 +31,6 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.schema.ColumnRangeInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
@@ -106,7 +105,7 @@ public class 
UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe
             new UnsafeSortDataRows(parameters, intermediateFileMergers[i], 
inMemoryChunkSizeInMB);
         sortDataRows[i].initialize();
       }
-    } catch (MemoryException e) {
+    } catch (Exception e) {
       throw new CarbonDataLoadingException(e);
     }
     ExecutorService executorService = 
Executors.newFixedThreadPool(iterators.length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b439b00f/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 7afda0e..48fd0eb 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -128,7 +128,7 @@ public class UnsafeSortDataRows {
   /**
    * This method will be used to initialize
    */
-  public void initialize() throws MemoryException {
+  public void initialize() throws MemoryException, 
CarbonSortKeyAndGroupByException {
     this.rowPage = createUnsafeRowPage();
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
@@ -141,13 +141,18 @@ public class UnsafeSortDataRows {
     semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
-  private UnsafeCarbonRowPage createUnsafeRowPage() throws MemoryException {
+  private UnsafeCarbonRowPage createUnsafeRowPage()
+      throws MemoryException, CarbonSortKeyAndGroupByException {
     MemoryBlock baseBlock =
         UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, 
inMemoryChunkSize);
     boolean isMemoryAvailable =
         UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
     if (isMemoryAvailable) {
       UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
+    } else {
+      LOGGER.info("trigger in-memory merge and spill for table " + 
parameters.getTableName());
+      // merge and spill in-memory pages to disk if memory is not enough
+      unsafeInMemoryIntermediateFileMerger.tryTriggerInmemoryMerging(true);
     }
     return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, 
!isMemoryAvailable, taskId);
   }
@@ -190,12 +195,7 @@ public class UnsafeSortDataRows {
         bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
       } else {
         try {
-          if (enableInMemoryIntermediateMerge) {
-            
unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
-          }
-          unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
-          semaphore.acquire();
-          dataSorterAndWriterExecutorService.execute(new 
DataSorterAndWriter(rowPage));
+          handlePreviousPage();
           rowPage = createUnsafeRowPage();
           bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
         } catch (Exception e) {
@@ -218,12 +218,7 @@ public class UnsafeSortDataRows {
       rowPage.addRow(row, rowBuffer.get());
     } else {
       try {
-        if (enableInMemoryIntermediateMerge) {
-          
unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
-        }
-        unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
-        semaphore.acquire();
-        dataSorterAndWriterExecutorService.submit(new 
DataSorterAndWriter(rowPage));
+        handlePreviousPage();
         rowPage = createUnsafeRowPage();
         rowPage.addRow(row, rowBuffer.get());
       } catch (Exception e) {
@@ -231,31 +226,21 @@ public class UnsafeSortDataRows {
             "exception occurred while trying to acquire a semaphore lock: " + 
e.getMessage());
         throw new CarbonSortKeyAndGroupByException(e);
       }
-
     }
   }
 
   /**
-   * Below method will be used to start storing process This method will get
-   * all the temp files present in sort temp folder then it will create the
-   * record holder heap and then it will read first record from each file and
-   * initialize the heap
+   * Below method will be used to start sorting process. This method will get
+   * all the temp unsafe pages in memory and all the temp files and try to 
merge them if possible.
+   * Also, it will spill the pages to disk or add it to unsafe sort memory.
    *
-   * @throws InterruptedException
+   * @throws CarbonSortKeyAndGroupByException if error occurs during in-memory 
merge
+   * @throws InterruptedException if error occurs during data sort and write
    */
-  public void startSorting() throws InterruptedException {
+  public void startSorting() throws CarbonSortKeyAndGroupByException, 
InterruptedException {
     LOGGER.info("Unsafe based sorting will be used");
     if (this.rowPage.getUsedSize() > 0) {
-      TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
-          new UnsafeIntSortDataFormat(rowPage));
-      if (parameters.getNumberOfNoDictSortColumns() > 0) {
-        timSort.sort(rowPage.getBuffer(), 0, 
rowPage.getBuffer().getActualSize(),
-            new UnsafeRowComparator(rowPage));
-      } else {
-        timSort.sort(rowPage.getBuffer(), 0, 
rowPage.getBuffer().getActualSize(),
-            new UnsafeRowComparatorForNormalDims(rowPage));
-      }
-      unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
+      handlePreviousPage();
     } else {
       rowPage.freeMemory();
     }
@@ -263,6 +248,21 @@ public class UnsafeSortDataRows {
   }
 
   /**
+   * Deal with the previous pages added to sort-memory. Carbondata will merge 
the in-memory pages
+   * or merge the sort temp files if possible. After that, carbondata will add 
current page to
+   * sort memory or just spill them.
+   */
+  private void handlePreviousPage()
+      throws CarbonSortKeyAndGroupByException, InterruptedException {
+    if (enableInMemoryIntermediateMerge) {
+      unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+    }
+    unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+    semaphore.acquire();
+    dataSorterAndWriterExecutorService.submit(new 
DataSorterAndWriter(rowPage));
+  }
+
+  /**
    * write a page to sort temp file
    * @param rowPage page
    * @param file file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b439b00f/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
index 3955864..01e7649 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
@@ -17,17 +17,28 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe.merger;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
 import java.util.AbstractQueue;
 import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRowForMerge;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeInmemoryMergeHolder;
 import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
 
-public class UnsafeInMemoryIntermediateDataMerger implements Runnable {
+public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> {
   /**
    * LOGGER
    */
@@ -54,35 +65,66 @@ public class UnsafeInMemoryIntermediateDataMerger 
implements Runnable {
   private long[] mergedAddresses;
 
   private byte[] rowPageIndexes;
+  private int totalSize;
+  private SortParameters sortParameters;
+  private SortStepRowHandler sortStepRowHandler;
+  private boolean spillDisk;
+  private File outputFile;
+  private DataOutputStream outputStream;
 
   /**
    * IntermediateFileMerger Constructor
    */
   public UnsafeInMemoryIntermediateDataMerger(UnsafeCarbonRowPage[] 
unsafeCarbonRowPages,
-      int totalSize) {
+      int totalSize, SortParameters sortParameters, boolean spillDisk) {
     this.holderCounter = unsafeCarbonRowPages.length;
     this.unsafeCarbonRowPages = unsafeCarbonRowPages;
     this.mergedAddresses = new long[totalSize];
     this.rowPageIndexes = new byte[totalSize];
     this.entryCount = 0;
+    this.totalSize = totalSize;
+    this.sortParameters = sortParameters;
+    this.sortStepRowHandler = new SortStepRowHandler(sortParameters);
+    this.spillDisk = spillDisk;
   }
 
   @Override
-  public void run() {
+  public Void call() throws Exception {
     long intermediateMergeStartTime = System.currentTimeMillis();
     int holderCounterConst = holderCounter;
     try {
       startSorting();
-      while (hasNext()) {
-        writeDataToMemory(next());
+      if (spillDisk) {
+        initSortTempFile();
+        while (hasNext()) {
+          writeDataToFile(next());
+        }
+      } else {
+        while (hasNext()) {
+          writeDataToMemory(next());
+        }
       }
+
       double intermediateMergeCostTime =
           (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
-      LOGGER.info("============================== Intermediate Merge of " + 
holderCounterConst
+      LOGGER.info("Intermediate Merge of " + holderCounterConst
           + " in-memory sort Cost Time: " + intermediateMergeCostTime + "(s)");
+      if (spillDisk) {
+        LOGGER.info("Merge and spill in-memory pages to disk, location: "
+            + outputFile.getAbsolutePath()
+            + ", file size in MB: " + outputFile.length() * 0.1 * 10 / 1024 / 
1024
+            + ", containing rows: " + totalSize);
+      }
     } catch (Exception e) {
       LOGGER.error(e, "Problem while intermediate merging");
+      throw e;
+    } finally {
+      if (spillDisk) {
+        CarbonUtil.closeStreams(outputStream);
+        close();
+      }
     }
+    return null;
   }
 
   /**
@@ -195,6 +237,24 @@ public class UnsafeInMemoryIntermediateDataMerger 
implements Runnable {
     entryCount++;
   }
 
+  private void initSortTempFile() throws IOException {
+    String tmpDir = sortParameters.getTempFileLocation()[
+        new Random().nextInt(sortParameters.getTempFileLocation().length)];
+    outputFile = new File(tmpDir + File.separator
+        + sortParameters.getTableName() + '_'
+        + sortParameters.getRangeId() + '_' + System.nanoTime()
+        + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+    outputStream = FileFactory.getDataOutputStream(outputFile.getPath(),
+        FileFactory.FileType.LOCAL, sortParameters.getFileWriteBufferSize(),
+        sortParameters.getSortTempCompressorName());
+    outputStream.writeInt(totalSize);
+  }
+
+  private void writeDataToFile(UnsafeCarbonRowForMerge row) throws IOException 
{
+    IntermediateSortTempRow sortTempRow = 
unsafeCarbonRowPages[row.index].getRow(row.address);
+    sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(sortTempRow, 
outputStream);
+  }
+
   public int getEntryCount() {
     return entryCount;
   }
@@ -210,4 +270,14 @@ public class UnsafeInMemoryIntermediateDataMerger 
implements Runnable {
   public byte[] getRowPageIndexes() {
     return rowPageIndexes;
   }
+
+  public boolean isSpillDisk() {
+    return spillDisk;
+  }
+
+  public void close() {
+    for (UnsafeCarbonRowPage rowPage : unsafeCarbonRowPages) {
+      rowPage.freeMemory();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b439b00f/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index 104f3f5..0c692c7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -18,6 +18,7 @@ package 
org.apache.carbondata.processing.loading.sort.unsafe.merger;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
@@ -29,6 +30,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
@@ -60,6 +64,10 @@ public class UnsafeIntermediateMerger {
   private List<File> procFiles;
 
   private List<Future<Void>> mergerTask;
+  /**
+   * size to be spilled in sort memory
+   */
+  private long spillSizeInSortMemory;
 
   public UnsafeIntermediateMerger(SortParameters parameters) {
     this.parameters = parameters;
@@ -70,6 +78,20 @@ public class UnsafeIntermediateMerger {
         new CarbonThreadFactory("UnsafeIntermediatePool:" + 
parameters.getTableName()));
     this.procFiles = new 
ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     this.mergerTask = new ArrayList<>();
+
+    Integer spillPercentage;
+    try {
+      String spillPercentageStr = CarbonProperties.getInstance().getProperty(
+          CarbonLoadOptionConstants.CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE,
+          
CarbonLoadOptionConstants.CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT);
+      spillPercentage = Integer.valueOf(spillPercentageStr);
+    } catch (NumberFormatException e) {
+      spillPercentage = Integer.valueOf(
+          
CarbonLoadOptionConstants.CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT);
+    }
+
+    this.spillSizeInSortMemory =
+        UnsafeSortMemoryManager.INSTANCE.getUsableMemory() * spillPercentage / 
100;
   }
 
   public void addDataChunkToMerge(UnsafeCarbonRowPage rowPage) {
@@ -120,37 +142,52 @@ public class UnsafeIntermediateMerger {
     mergerTask.add(executorService.submit(merger));
   }
 
-  public void startInmemoryMergingIfPossible() throws 
CarbonSortKeyAndGroupByException {
-    UnsafeCarbonRowPage[] localRowPages;
-    if (rowPages.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) 
{
-      int totalRows = 0;
-      synchronized (lockObject) {
-        totalRows = getTotalNumberOfRows(rowPages);
-        if (totalRows <= 0) {
-          return;
+  public void tryTriggerInmemoryMerging(boolean spillDisk)
+      throws CarbonSortKeyAndGroupByException {
+    List<UnsafeCarbonRowPage> pages2Merge = new ArrayList<>();
+    int totalRows2Merge = 0;
+    synchronized (lockObject) {
+      long sizeAdded = 0;
+      for (Iterator<UnsafeCarbonRowPage> iter = rowPages.iterator(); 
iter.hasNext(); ) {
+        UnsafeCarbonRowPage page = iter.next();
+        if (!spillDisk || sizeAdded + page.getDataBlock().size() < 
this.spillSizeInSortMemory) {
+          pages2Merge.add(page);
+          totalRows2Merge += page.getBuffer().getActualSize();
+          iter.remove();
+        } else {
+          break;
         }
-        localRowPages = rowPages.toArray(new 
UnsafeCarbonRowPage[rowPages.size()]);
-        this.rowPages = new ArrayList<>();
       }
+    }
+    if (pages2Merge.size() > 1) {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Sumitting request for intermediate merging of in-memory 
pages : "
-            + localRowPages.length);
+            + pages2Merge.size());
       }
-      startIntermediateMerging(localRowPages, totalRows);
+      startIntermediateMerging(pages2Merge.toArray(new 
UnsafeCarbonRowPage[pages2Merge.size()]),
+          totalRows2Merge, spillDisk);
+    }
+  }
+
+  public void startInmemoryMergingIfPossible() throws 
CarbonSortKeyAndGroupByException {
+    if (rowPages.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) 
{
+      tryTriggerInmemoryMerging(false);
     }
   }
 
   /**
-   * Below method will be used to start the intermediate file merging
+   * Below method will be used to start the intermediate inmemory merging
    *
-   * @param rowPages
+   * @param rowPages pages to be merged
+   * @param totalRows total rows in all pages
+   * @param spillDisk whether to spill the merged result to disk
    */
-  private void startIntermediateMerging(UnsafeCarbonRowPage[] rowPages, int 
totalRows)
-      throws CarbonSortKeyAndGroupByException {
+  private void startIntermediateMerging(UnsafeCarbonRowPage[] rowPages, int 
totalRows,
+      boolean spillDisk) throws CarbonSortKeyAndGroupByException {
     UnsafeInMemoryIntermediateDataMerger merger =
-        new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows);
+        new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows, 
parameters, spillDisk);
     mergedPages.add(merger);
-    executorService.execute(merger);
+    mergerTask.add(executorService.submit(merger));
   }
 
   private int getTotalNumberOfRows(List<UnsafeCarbonRowPage> 
unsafeCarbonRowPages) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b439b00f/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index b1dc156..073d13b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -22,6 +22,7 @@ import java.io.FileFilter;
 import java.util.AbstractQueue;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 
@@ -80,6 +81,15 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends 
CarbonIterator<Objec
    */
   public void startFinalMerge(UnsafeCarbonRowPage[] rowPages,
       List<UnsafeInMemoryIntermediateDataMerger> merges) throws 
CarbonDataWriterException {
+    // remove the spilled pages
+    for (Iterator<UnsafeInMemoryIntermediateDataMerger> iter = 
merges.iterator();
+         iter.hasNext(); ) {
+      UnsafeInMemoryIntermediateDataMerger merger = iter.next();
+      if (merger.isSpillDisk()) {
+        // it has already been closed once the spill is finished, so no need 
to close it here
+        iter.remove();
+      }
+    }
     startSorting(rowPages, merges);
   }
 
@@ -99,8 +109,9 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends 
CarbonIterator<Objec
         LOGGER.info("No files to merge sort");
         return;
       }
-      LOGGER.info("Starting final merger");
-      LOGGER.info("Number of row pages: " + this.fileCounter);
+      LOGGER.info(String.format("Starting final merge of %d pages, including 
row pages: %d"
+          + ", sort temp files: %d, intermediate merges: %d",
+          this.fileCounter, rowPages.length, filesToMergeSort.size(), 
merges.size()));
 
       // create record holder heap
       createRecordHolderQueue();

Reply via email to