Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2052#discussion_r174364965
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
---
@@ -218,50 +213,45 @@ public void addRow(Object[] row) throws
CarbonSortKeyAndGroupByException {
rowPage.addRow(row, rowBuffer.get());
} else {
try {
- if (enableInMemoryIntermediateMerge) {
-
unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
- }
- unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
- semaphore.acquire();
- dataSorterAndWriterExecutorService.submit(new
DataSorterAndWriter(rowPage));
+ handlePreviousPage();
rowPage = createUnsafeRowPage();
rowPage.addRow(row, rowBuffer.get());
} catch (Exception e) {
LOGGER.error(
"exception occurred while trying to acquire a semaphore lock:
" + e.getMessage());
throw new CarbonSortKeyAndGroupByException(e);
}
-
}
}
/**
- * Below method will be used to start storing process This method will
get
- * all the temp files present in sort temp folder then it will create the
- * record holder heap and then it will read first record from each file
and
- * initialize the heap
+ * Below method will be used to start sorting process. This method will
get
+ * all the temp unsafe pages in memory and all the temp files and try to
merge them if possible.
+ * Also, it will spill the pages to disk or add it to unsafe sort memory.
*
- * @throws InterruptedException
+ * @throws CarbonSortKeyAndGroupByException if error occurs during
in-memory merge
+ * @throws InterruptedException if error occurs during data sort and
write
*/
- public void startSorting() throws InterruptedException {
+ public void startSorting() throws CarbonSortKeyAndGroupByException,
InterruptedException {
LOGGER.info("Unsafe based sorting will be used");
if (this.rowPage.getUsedSize() > 0) {
- TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
- new UnsafeIntSortDataFormat(rowPage));
- if (parameters.getNumberOfNoDictSortColumns() > 0) {
- timSort.sort(rowPage.getBuffer(), 0,
rowPage.getBuffer().getActualSize(),
- new UnsafeRowComparator(rowPage));
- } else {
- timSort.sort(rowPage.getBuffer(), 0,
rowPage.getBuffer().getActualSize(),
- new UnsafeRowComparatorForNormalDims(rowPage));
- }
- unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
+ handlePreviousPage();
} else {
rowPage.freeMemory();
}
startFileBasedMerge();
}
+ private void handlePreviousPage()
--- End diff --
fixed
---