Repository: carbondata Updated Branches: refs/heads/master 9ee74fe07 -> 4c9bed8bc
[CARBONDATA-2307] Fix OOM issue when using DataFrame.coalesce Fix OOM issue when using DataFrame.coalesce This closes #2136 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4c9bed8b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4c9bed8b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4c9bed8b Branch: refs/heads/master Commit: 4c9bed8bc6a8b9a517fa7bbe1635bc3da209c45b Parents: 9ee74fe Author: Jin Zhou <xapr...@yeah.net> Authored: Tue Apr 3 18:48:51 2018 +0800 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Sat Apr 14 13:09:41 2018 +0530 ---------------------------------------------------------------------- .../datastore/chunk/AbstractRawColumnChunk.java | 4 +++- .../chunk/impl/AbstractDimensionColumnPage.java | 5 ++++- .../chunk/impl/DimensionRawColumnChunk.java | 2 ++ .../chunk/impl/MeasureRawColumnChunk.java | 2 ++ .../SafeAbsractDimensionDataChunkStore.java | 4 +++- ...feVariableLengthDimensionDataChunkStore.java | 6 ++++++ .../core/scan/result/BlockletScannedResult.java | 3 +++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 21 +++++++++++++++++--- 8 files changed, 41 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java index 05ac9ff..af1c811 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java @@ -96,7 +96,9 @@ public abstract class AbstractRawColumnChunk { this.rowCount = rowCount; } - public abstract void freeMemory(); + public void freeMemory() { + rawData = null; + } public int getColumnIndex() { return columnIndex; http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java index 6f316c5..91e55dc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java @@ -77,7 +77,10 @@ public abstract class AbstractDimensionColumnPage implements DimensionColumnPage * below method will be used to free the allocated memory */ @Override public void freeMemory() { - dataChunkStore.freeMemory(); + if (dataChunkStore != null) { + dataChunkStore.freeMemory(); + dataChunkStore = null; + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java index f9bb590..c7a8337 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java @@ -102,10 +102,12 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk { } @Override public void freeMemory() { + super.freeMemory(); if (null != dataChunks) { for (int i = 0; i < dataChunks.length; i++) { if (dataChunks[i] != null) { dataChunks[i].freeMemory(); + dataChunks[i] = null; } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java index 5e8618b..2311887 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java @@ -102,10 +102,12 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk { } @Override public void freeMemory() { + super.freeMemory(); if (null != columnPages) { for (int i = 0; i < columnPages.length; i++) { if (columnPages[i] != null) { columnPages[i].freeMemory(); + columnPages[i] = null; } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java index f7189e6..e9bf24b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java @@ -72,7 +72,9 @@ public abstract class SafeAbsractDimensionDataChunkStore implements DimensionDat * Below method will be used to free the memory occupied by the column chunk */ @Override public void freeMemory() { - // do nothing as GC will take care of freeing memory + data = null; + invertedIndex = null; + invertedIndexReverse = null; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java index 09230dd..bb9c888 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java @@ -184,4 +184,10 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens return ByteUtil.UnsafeComparer.INSTANCE .compareTo(data, currentDataOffset, length, compareValue, 0, compareValue.length); } + + @Override + public void freeMemory() { + super.freeMemory(); + dataOffsets = null; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java index 29404b4..df403c5 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java @@ -499,6 +499,7 @@ public abstract class BlockletScannedResult { for (int j = 0; j < dimensionColumnPages[i].length; j++) { if (null != dimensionColumnPages[i][j]) { dimensionColumnPages[i][j].freeMemory(); + dimensionColumnPages[i][j] = null; } } } @@ -511,6 +512,7 @@ public abstract class BlockletScannedResult { for (int j = 0; j < measureColumnPages[i].length; j++) { if (null != measureColumnPages[i][j]) { measureColumnPages[i][j].freeMemory(); + measureColumnPages[i][j] = null; } } } @@ -521,6 +523,7 @@ public abstract class BlockletScannedResult { for (int i = 0; i < dimRawColumnChunks.length; i++) { if (null != dimRawColumnChunks[i]) { dimRawColumnChunks[i].freeMemory(); + dimRawColumnChunks[i] = null; } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index e3a62b6..31d3715 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -413,7 +413,7 @@ class CarbonScanRDD[T: ClassTag]( // one query id per table model.setQueryId(queryId) // get RecordReader by FileFormat - val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match { + var reader: RecordReader[Void, Object] = inputSplit.getFileFormat match { case FileFormat.ROW_V1 => // create record reader for row format DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance()) @@ -445,11 +445,23 @@ class CarbonScanRDD[T: ClassTag]( } } + val closeReader = () => { + if (reader != null) { + try { + reader.close() + } catch { + case e: Exception => + LOGGER.error(e) + } + reader = null + } + } + // add task completion before calling initialize as initialize method will internally call // for usage of unsafe method for processing of one blocklet and if there is any exception // while doing that the unsafe memory occupied for that task will not get cleared context.addTaskCompletionListener { _ => - reader.close() + closeReader.apply() close() logStatistics(executionId, taskId, queryStartTime, model.getStatisticsRecorder, split) } @@ -468,6 +480,9 @@ class CarbonScanRDD[T: ClassTag]( finished = !reader.nextKeyValue havePair = !finished } + if (finished) { + closeReader.apply() + } !finished } @@ -489,7 +504,6 @@ class CarbonScanRDD[T: ClassTag]( } } - iterator.asInstanceOf[Iterator[T]] } @@ -727,4 +741,5 @@ class CarbonScanRDD[T: ClassTag]( def setVectorReaderSupport(boolean: Boolean): Unit = { vectorReader = boolean } + }