This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch 1.21
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b40c631d6de0c55f91bff83662a3916327210711
Author: shfshihuafeng <shfshihuaf...@163.com>
AuthorDate: Mon Mar 25 08:29:51 2024 +0800

    DRILL-8485: HashJoinPOP memory leak is caused by an oom exception when read 
data from InputStream (#2891)
---
 .../exec/cache/VectorAccessibleSerializable.java      | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 46fc72f016..b041d4795f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.DrillMetrics;
@@ -124,11 +125,19 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
     for (SerializedField metaData : fieldList) {
       final int dataLength = metaData.getBufferLength();
       final MaterializedField field = MaterializedField.create(metaData);
-      final DrillBuf buf = allocator.read(dataLength, input);
-      final ValueVector vector = TypeHelper.getNewVector(field, allocator);
-      vector.load(metaData, buf);
-      buf.release(); // Vector now owns the buffer
-      vectorList.add(vector);
+      DrillBuf buf = null;
+      try {
+        buf = allocator.read(dataLength, input);
+        final ValueVector vector = TypeHelper.getNewVector(field, allocator);
+        vector.load(metaData, buf);
+        buf.release(); // Vector now owns the buffer
+        vectorList.add(vector);
+      } catch (OutOfMemoryError oom) {
+        for (ValueVector valueVector : vectorList) {
+          valueVector.clear();
+        }
+        throw UserException.memoryError(oom).message("Allocator memory 
failed").build(logger);
+      }
     }
     container.addCollection(vectorList);
     container.buildSchema(svMode);

Reply via email to