This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 453f19f8cf [GLUTEN-9621][FLINK] Fix rowvector may not close caused
memory leak (#9622)
453f19f8cf is described below
commit 453f19f8cfe2032120db4a10210615787d768122
Author: kevinyhzou <[email protected]>
AuthorDate: Fri May 16 16:25:05 2025 +0800
[GLUTEN-9621][FLINK] Fix rowvector may not close caused memory leak (#9622)
---
.../operators/GlutenSingleInputOperator.java | 45 ++++++++++-------
.../vectorized/FlinkRowToVLVectorConvertor.java | 59 +++++++++++++---------
2 files changed, 63 insertions(+), 41 deletions(-)
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
index 034768f36a..2cb69db163 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
@@ -106,25 +106,36 @@ public class GlutenSingleInputOperator extends
TableStreamOperator<RowData>
@Override
public void processElement(StreamRecord<RowData> element) {
- final RowVector inRv = FlinkRowToVLVectorConvertor.fromRowData(
- element.getValue(),
- allocator,
- session,
- inputType);
- inputQueue.put(inRv);
- UpIterator.State state = task.advance();
- if (state == UpIterator.State.AVAILABLE) {
- RowVector outRv = task.get();
- List<RowData> rows = FlinkRowToVLVectorConvertor.toRowData(
- outRv,
- allocator,
- outputType);
- for (RowData row : rows) {
- output.collect(outElement.replace(row));
+ RowVector inRv = null;
+ RowVector outRv = null;
+ try {
+ inRv = FlinkRowToVLVectorConvertor.fromRowData(
+ element.getValue(),
+ allocator,
+ session,
+ inputType);
+ inputQueue.put(inRv);
+ UpIterator.State state = task.advance();
+ if (state == UpIterator.State.AVAILABLE) {
+ outRv = task.get();
+ List<RowData> rows = FlinkRowToVLVectorConvertor.toRowData(
+ outRv,
+ allocator,
+ outputType);
+ for (RowData row : rows) {
+ output.collect(outElement.replace(row));
+ }
}
- outRv.close();
+ } finally {
+ /// The RowVector should be closed in `finally`, to avoid it may
not be closed when exceptions rasied,
+ /// that lead to memory leak.
+ if (outRv != null) {
+ outRv.close();
+ }
+ if (inRv != null) {
+ inRv.close();
+ }
}
- inRv.close();
}
@Override
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
index 8f75122776..06962e0449 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
@@ -131,33 +131,44 @@ public class FlinkRowToVLVectorConvertor {
BufferAllocator allocator,
RowType rowType) {
// TODO: support more types
- final BaseVector loadedVector = rowVector.loadedVector();
- final FieldVector fieldVector = Arrow.toArrowVector(
- allocator,
- loadedVector);
- List<RowData> rowDatas = new ArrayList<>(rowVector.getSize());
- for (int j = 0; j < rowVector.getSize(); j++) {
- List<Object> fieldValues = new ArrayList<>(rowType.size());
- for (int i = 0; i < rowType.size(); i++) {
- Type fieldType = rowType.getChildren().get(i);
- if (fieldType instanceof IntegerType) {
- fieldValues.add(i, ((IntVector)
fieldVector.getChildrenFromFields().get(i)).get(j));
- } else if (fieldType instanceof BigIntType) {
- fieldValues.add(i, ((BigIntVector)
fieldVector.getChildrenFromFields().get(i)).get(j));
- } else if (fieldType instanceof VarCharType) {
- fieldValues.add(
- i,
- BinaryStringData.fromBytes(
- ((VarCharVector)
fieldVector.getChildrenFromFields().get(i)).get(j)));
- } else {
- throw new RuntimeException("Unsupported field type: " +
fieldType);
+ BaseVector loadedVector = null;
+ FieldVector fieldVector = null;
+ try {
+ loadedVector = rowVector.loadedVector();
+ fieldVector = Arrow.toArrowVector(
+ allocator,
+ loadedVector);
+ List<RowData> rowDatas = new ArrayList<>(rowVector.getSize());
+ for (int j = 0; j < rowVector.getSize(); j++) {
+ List<Object> fieldValues = new ArrayList<>(rowType.size());
+ for (int i = 0; i < rowType.size(); i++) {
+ Type fieldType = rowType.getChildren().get(i);
+ if (fieldType instanceof IntegerType) {
+ fieldValues.add(i, ((IntVector)
fieldVector.getChildrenFromFields().get(i)).get(j));
+ } else if (fieldType instanceof BigIntType) {
+ fieldValues.add(i, ((BigIntVector)
fieldVector.getChildrenFromFields().get(i)).get(j));
+ } else if (fieldType instanceof VarCharType) {
+ fieldValues.add(
+ i,
+ BinaryStringData.fromBytes(
+ ((VarCharVector)
fieldVector.getChildrenFromFields().get(i)).get(j)));
+ } else {
+ throw new RuntimeException("Unsupported field type: "
+ fieldType);
+ }
}
+ rowDatas.add(GenericRowData.of(fieldValues.toArray()));
+ }
+ return rowDatas;
+ } finally {
+ /// The FieldVector/BaseVector should be closed in `finally`, to
avoid it may not be closed when exceptions rasied,
+ /// that lead to memory leak.
+ if (fieldVector != null) {
+ fieldVector.close();
+ }
+ if (loadedVector != null) {
+ loadedVector.close();
}
- rowDatas.add(GenericRowData.of(fieldValues.toArray()));
}
- fieldVector.close();
- loadedVector.close();
- return rowDatas;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]