This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 7df5209 [CARBONDATA-3787] Fixed MemoryLeak in data load and compaction
7df5209 is described below
commit 7df52098aff374d88e54e4c38ccfa9baf2a40176
Author: kumarvishal09 <[email protected]>
AuthorDate: Tue Apr 14 09:40:55 2020 +0800
[CARBONDATA-3787] Fixed MemoryLeak in data load and compaction
Why is this PR needed?
PR #3638 uses direct byte buffer. This PR is for fixing Offheap memory leak
during load/compaction
What changes were proposed in this PR?
DirectByteBuffer are cleaned in offheap memory by JVM, when GC happens
(when heap is full). Consider a scenario, where heap is not full, but offheap
is getting full. In this case DirectByteBuffers is not cleaned as GC not
happened. This time we may get OOM from offheap memory and yarn may kill the
jvm process. So, it is better to clean up the DirectByteBuffers after the usage
by calling a reflection method.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3706
---
.../blocklet/BlockletEncodedColumnPage.java | 7 +++
.../core/datastore/blocklet/EncodedBlocklet.java | 10 +++++
.../datastore/compression/AbstractCompressor.java | 51 +++++++++++++++-------
.../page/ActualDataBasedFallbackEncoder.java | 1 +
.../page/DecoderBasedFallbackEncoder.java | 1 +
.../datastore/page/LVByteBufferColumnPage.java | 10 ++++-
.../datastore/page/encoding/EncodedColumnPage.java | 5 +++
.../core/memory/UnsafeMemoryManager.java | 29 ++++++++++++
8 files changed, 98 insertions(+), 16 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
index d4fc6d5..7ec1136 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
@@ -217,4 +217,11 @@ public class BlockletEncodedColumnPage {
new ActualDataBasedFallbackEncoder(encodedColumnPage, pageIndex)));
}
}
+
+ public void cleanBuffer() {
+ for (EncodedColumnPage encodedColumnPage : encodedColumnPageList) {
+ encodedColumnPage.cleanBuffer();
+ }
+ }
+
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
index 86edc12..262c442 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
@@ -192,6 +192,16 @@ public class EncodedBlocklet {
}
public void clear() {
+ if (null != encodedDimensionColumnPages) {
+ for (BlockletEncodedColumnPage blockletEncodedColumnPage :
encodedDimensionColumnPages) {
+ blockletEncodedColumnPage.cleanBuffer();
+ }
+ }
+ if (null != encodedMeasureColumnPages) {
+ for (BlockletEncodedColumnPage blockletEncodedColumnPage :
encodedMeasureColumnPages) {
+ blockletEncodedColumnPage.cleanBuffer();
+ }
+ }
this.numberOfPages = 0;
this.encodedDimensionColumnPages = null;
this.blockletSize = 0;
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
index 12bc2d5..60f73ea 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/AbstractCompressor.java
@@ -26,6 +26,7 @@ import java.nio.IntBuffer;
import java.nio.LongBuffer;
import java.nio.ShortBuffer;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.util.ByteUtil;
public abstract class AbstractCompressor implements Compressor {
@@ -33,9 +34,13 @@ public abstract class AbstractCompressor implements
Compressor {
@Override
public ByteBuffer compressShort(short[] unCompInput) {
ByteBuffer unCompBuffer = ByteBuffer.allocateDirect(unCompInput.length *
ByteUtil.SIZEOF_SHORT);
-
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().put(unCompInput);
- unCompBuffer.position(unCompBuffer.limit());
- return compressByte(unCompBuffer);
+ try {
+
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().put(unCompInput);
+ unCompBuffer.position(unCompBuffer.limit());
+ return compressByte(unCompBuffer);
+ } finally {
+ UnsafeMemoryManager.destroyDirectByteBuffer(unCompBuffer);
+ }
}
@Override
@@ -51,9 +56,13 @@ public abstract class AbstractCompressor implements
Compressor {
@Override
public ByteBuffer compressInt(int[] unCompInput) {
ByteBuffer unCompBuffer = ByteBuffer.allocateDirect(unCompInput.length *
ByteUtil.SIZEOF_INT);
- unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(unCompInput);
- unCompBuffer.position(unCompBuffer.limit());
- return compressByte(unCompBuffer);
+ try {
+
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(unCompInput);
+ unCompBuffer.position(unCompBuffer.limit());
+ return compressByte(unCompBuffer);
+ } finally {
+ UnsafeMemoryManager.destroyDirectByteBuffer(unCompBuffer);
+ }
}
@Override
@@ -69,9 +78,13 @@ public abstract class AbstractCompressor implements
Compressor {
@Override
public ByteBuffer compressLong(long[] unCompInput) {
ByteBuffer unCompBuffer = ByteBuffer.allocateDirect(unCompInput.length *
ByteUtil.SIZEOF_LONG);
-
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().put(unCompInput);
- unCompBuffer.position(unCompBuffer.limit());
- return compressByte(unCompBuffer);
+ try {
+
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().put(unCompInput);
+ unCompBuffer.position(unCompBuffer.limit());
+ return compressByte(unCompBuffer);
+ } finally {
+ UnsafeMemoryManager.destroyDirectByteBuffer(unCompBuffer);
+ }
}
@Override
@@ -87,9 +100,13 @@ public abstract class AbstractCompressor implements
Compressor {
@Override
public ByteBuffer compressFloat(float[] unCompInput) {
ByteBuffer unCompBuffer = ByteBuffer.allocateDirect(unCompInput.length *
ByteUtil.SIZEOF_FLOAT);
-
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer().put(unCompInput);
- unCompBuffer.position(unCompBuffer.limit());
- return compressByte(unCompBuffer);
+ try {
+
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer().put(unCompInput);
+ unCompBuffer.position(unCompBuffer.limit());
+ return compressByte(unCompBuffer);
+ } finally {
+ UnsafeMemoryManager.destroyDirectByteBuffer(unCompBuffer);
+ }
}
@Override
@@ -106,9 +123,13 @@ public abstract class AbstractCompressor implements
Compressor {
public ByteBuffer compressDouble(double[] unCompInput) {
ByteBuffer unCompBuffer =
ByteBuffer.allocateDirect(unCompInput.length * ByteUtil.SIZEOF_DOUBLE);
-
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer().put(unCompInput);
- unCompBuffer.position(unCompBuffer.limit());
- return compressByte(unCompBuffer);
+ try {
+
unCompBuffer.order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer().put(unCompInput);
+ unCompBuffer.position(unCompBuffer.limit());
+ return compressByte(unCompBuffer);
+ } finally {
+ UnsafeMemoryManager.destroyDirectByteBuffer(unCompBuffer);
+ }
}
@Override
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ActualDataBasedFallbackEncoder.java
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ActualDataBasedFallbackEncoder.java
index da3d0e1..3cac586 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ActualDataBasedFallbackEncoder.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ActualDataBasedFallbackEncoder.java
@@ -64,6 +64,7 @@ public class ActualDataBasedFallbackEncoder
// be used.
// This is required to free the memory once it is of no use
encodedColumnPage.freeMemory();
+ encodedColumnPage.cleanBuffer();
return fallbackEncodedColumnPage;
}
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
index be59e4a..017e605 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java
@@ -126,6 +126,7 @@ public class DecoderBasedFallbackEncoder implements
Callable<FallbackEncodedColu
// fallBackEncodedColumnPage is created using new page of actual data
// This is required to free the memory once it is of no use
actualDataColumnPage.freeMemory();
+ encodedColumnPage.cleanBuffer();
return fallBackEncodedColumnPage;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LVByteBufferColumnPage.java
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LVByteBufferColumnPage.java
index 29135af..8656205 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LVByteBufferColumnPage.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LVByteBufferColumnPage.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.datastore.TableSpec;
import
org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -121,6 +122,7 @@ public class LVByteBufferColumnPage extends ColumnPage {
CarbonUnsafe.getUnsafe().copyMemory(((DirectBuffer)byteBuffer).address(),
((DirectBuffer)newBuffer).address(), capacity);
newBuffer.position(byteBuffer.position());
+ UnsafeMemoryManager.destroyDirectByteBuffer(byteBuffer);
byteBuffer = newBuffer;
}
}
@@ -373,7 +375,13 @@ public class LVByteBufferColumnPage extends ColumnPage {
@Override
public void freeMemory() {
-
+ if (null != rowOffset) {
+ rowOffset.freeMemory();
+ rowOffset = null;
+ }
+ if (null != byteBuffer) {
+ UnsafeMemoryManager.destroyDirectByteBuffer(byteBuffer);
+ }
}
@Override
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
index 58f00a9..d661367 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.LocalDictColumnPage;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
@@ -99,4 +100,8 @@ public class EncodedColumnPage {
page.freeMemoryForce();
}
}
+
+ public void cleanBuffer() {
+ UnsafeMemoryManager.destroyDirectByteBuffer(encodedData);
+ }
}
\ No newline at end of file
diff --git
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index d060cac..466c023 100644
---
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -17,6 +17,9 @@
package org.apache.carbondata.core.memory;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -206,4 +209,30 @@ public class UnsafeMemoryManager {
public static boolean isOffHeap() {
return offHeap;
}
+
+ /**
+ * DirectByteBuffers are garbage collected by using a phantom reference and a
+ * reference queue. Every once a while, the JVM checks the reference queue
and
+ * cleans the DirectByteBuffers. However, as this doesn't happen
+ * immediately after discarding all references to a DirectByteBuffer, it's
+ * easy to OutOfMemoryError yourself using DirectByteBuffers. This function
+ * explicitly calls the Cleaner method of a DirectByteBuffer.
+ *
+ * @param toBeDestroyed The DirectByteBuffer that will be "cleaned".
Utilizes reflection.
+ */
+ public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed) {
+ if (!toBeDestroyed.isDirect()) {
+ return;
+ }
+ try {
+ Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner");
+ cleanerMethod.setAccessible(true);
+ Object cleaner = cleanerMethod.invoke(toBeDestroyed);
+ Method cleanMethod = cleaner.getClass().getMethod("clean");
+ cleanMethod.setAccessible(true);
+ cleanMethod.invoke(cleaner);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
}