This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 453f19f8cf [GLUTEN-9621][FLINK] Fix rowvector may not close caused 
memory leak (#9622)
453f19f8cf is described below

commit 453f19f8cfe2032120db4a10210615787d768122
Author: kevinyhzou <[email protected]>
AuthorDate: Fri May 16 16:25:05 2025 +0800

    [GLUTEN-9621][FLINK] Fix rowvector may not close caused memory leak (#9622)
---
 .../operators/GlutenSingleInputOperator.java       | 45 ++++++++++-------
 .../vectorized/FlinkRowToVLVectorConvertor.java    | 59 +++++++++++++---------
 2 files changed, 63 insertions(+), 41 deletions(-)

diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
index 034768f36a..2cb69db163 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
@@ -106,25 +106,36 @@ public class GlutenSingleInputOperator extends 
TableStreamOperator<RowData>
 
     @Override
     public void processElement(StreamRecord<RowData> element) {
-        final RowVector inRv = FlinkRowToVLVectorConvertor.fromRowData(
-            element.getValue(),
-            allocator,
-            session,
-            inputType);
-        inputQueue.put(inRv);
-        UpIterator.State state = task.advance();
-        if (state == UpIterator.State.AVAILABLE) {
-            RowVector outRv = task.get();
-            List<RowData> rows = FlinkRowToVLVectorConvertor.toRowData(
-                    outRv,
-                    allocator,
-                    outputType);
-            for (RowData row : rows) {
-                output.collect(outElement.replace(row));
+        RowVector inRv = null;
+        RowVector outRv = null;
+        try {
+            inRv = FlinkRowToVLVectorConvertor.fromRowData(
+                element.getValue(),
+                allocator,
+                session,
+                inputType);
+            inputQueue.put(inRv);
+            UpIterator.State state = task.advance();
+            if (state == UpIterator.State.AVAILABLE) {
+                outRv = task.get();
+                List<RowData> rows = FlinkRowToVLVectorConvertor.toRowData(
+                        outRv,
+                        allocator,
+                        outputType);
+                for (RowData row : rows) {
+                    output.collect(outElement.replace(row));
+                }
             }
-            outRv.close();
+        } finally {
+            /// The RowVector should be closed in `finally`, to avoid it may 
not be closed when exceptions rasied,
+            /// that lead to memory leak.
+            if (outRv != null) {
+                outRv.close();
+            }
+            if (inRv != null) {
+                inRv.close();
+           }
         }
-        inRv.close();
     }
 
     @Override
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
index 8f75122776..06962e0449 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
@@ -131,33 +131,44 @@ public class FlinkRowToVLVectorConvertor {
             BufferAllocator allocator,
             RowType rowType) {
         // TODO: support more types
-        final BaseVector loadedVector = rowVector.loadedVector();
-        final FieldVector fieldVector = Arrow.toArrowVector(
-                allocator,
-                loadedVector);
-        List<RowData> rowDatas = new ArrayList<>(rowVector.getSize());
-        for (int j = 0; j < rowVector.getSize(); j++) {
-            List<Object> fieldValues = new ArrayList<>(rowType.size());
-            for (int i = 0; i < rowType.size(); i++) {
-                Type fieldType = rowType.getChildren().get(i);
-                if (fieldType instanceof IntegerType) {
-                    fieldValues.add(i, ((IntVector) 
fieldVector.getChildrenFromFields().get(i)).get(j));
-                } else if (fieldType instanceof BigIntType) {
-                    fieldValues.add(i, ((BigIntVector) 
fieldVector.getChildrenFromFields().get(i)).get(j));
-                } else if (fieldType instanceof VarCharType) {
-                    fieldValues.add(
-                            i,
-                            BinaryStringData.fromBytes(
-                                    ((VarCharVector) 
fieldVector.getChildrenFromFields().get(i)).get(j)));
-                } else {
-                    throw new RuntimeException("Unsupported field type: " + 
fieldType);
+        BaseVector loadedVector = null;
+        FieldVector fieldVector = null;
+        try {
+            loadedVector = rowVector.loadedVector();
+            fieldVector = Arrow.toArrowVector(
+                    allocator,
+                    loadedVector);
+            List<RowData> rowDatas = new ArrayList<>(rowVector.getSize());
+            for (int j = 0; j < rowVector.getSize(); j++) {
+                List<Object> fieldValues = new ArrayList<>(rowType.size());
+                for (int i = 0; i < rowType.size(); i++) {
+                    Type fieldType = rowType.getChildren().get(i);
+                    if (fieldType instanceof IntegerType) {
+                        fieldValues.add(i, ((IntVector) 
fieldVector.getChildrenFromFields().get(i)).get(j));
+                    } else if (fieldType instanceof BigIntType) {
+                        fieldValues.add(i, ((BigIntVector) 
fieldVector.getChildrenFromFields().get(i)).get(j));
+                    } else if (fieldType instanceof VarCharType) {
+                        fieldValues.add(
+                                i,
+                                BinaryStringData.fromBytes(
+                                        ((VarCharVector) 
fieldVector.getChildrenFromFields().get(i)).get(j)));
+                    } else {
+                        throw new RuntimeException("Unsupported field type: " 
+ fieldType);
+                    }
                 }
+                rowDatas.add(GenericRowData.of(fieldValues.toArray()));
+            }
+            return rowDatas;
+        } finally {
+            /// The FieldVector/BaseVector should be closed in `finally`, to 
avoid it may not be closed when exceptions rasied,
+            /// that lead to memory leak.
+            if (fieldVector != null) {
+                fieldVector.close();
+            }
+            if (loadedVector != null) {
+                loadedVector.close();
             }
-            rowDatas.add(GenericRowData.of(fieldValues.toArray()));
         }
-        fieldVector.close();
-        loadedVector.close();
-        return rowDatas;
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to