[CARBONDATA-2232][DataLoad] Fix incorrect logic in spilling unsafe pages to disk
The unsafe row page will only be written to disk if the memory is unavailable -- the previous logic just reversed it. This closes #2037 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a161841e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a161841e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a161841e Branch: refs/heads/master Commit: a161841e8808a3a477715346c8b28e683a5bc4d7 Parents: b509ad8 Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Fri Mar 9 09:55:44 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed Mar 14 12:09:57 2018 +0800 ---------------------------------------------------------------------- .../loading/sort/unsafe/UnsafeSortDataRows.java | 37 ++++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a161841e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java index eaa858e..7afda0e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java @@ -129,14 +129,7 @@ public class UnsafeSortDataRows { * This method will be used to initialize */ public void initialize() throws MemoryException { - MemoryBlock baseBlock = - UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize); - boolean isMemoryAvailable = - UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size()); - if (isMemoryAvailable) { - UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size()); - } - this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId); + this.rowPage = createUnsafeRowPage(); // Delete if any older file exists in sort temp folder deleteSortLocationIfExists(); @@ -148,6 +141,17 @@ public class UnsafeSortDataRows { semaphore = new Semaphore(parameters.getNumberOfCores()); } + private UnsafeCarbonRowPage createUnsafeRowPage() throws MemoryException { + MemoryBlock baseBlock = + UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize); + boolean isMemoryAvailable = + UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size()); + if (isMemoryAvailable) { + UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size()); + } + return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId); + } + public boolean canAdd() { return bytesAdded < maxSizeAllowed; } @@ -192,14 +196,7 @@ public class UnsafeSortDataRows { unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible(); semaphore.acquire(); dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(rowPage)); - MemoryBlock memoryBlock = - UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize); - boolean saveToDisk = - UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size()); - if (!saveToDisk) { - UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size()); - } - rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId); + rowPage = createUnsafeRowPage(); bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get()); } catch (Exception e) { LOGGER.error( @@ -227,13 +224,7 @@ public class UnsafeSortDataRows { unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible(); semaphore.acquire(); dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage)); - MemoryBlock memoryBlock = - UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize); - boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size()); - if (!saveToDisk) { - UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size()); - } - rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId); + rowPage = createUnsafeRowPage(); rowPage.addRow(row, rowBuffer.get()); } catch (Exception e) { LOGGER.error(