This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new d39f5ab99f67 [SPARK-50235][SQL] Clean up ColumnVector resource after
processing all rows in ColumnarToRowExec
d39f5ab99f67 is described below
commit d39f5ab99f67ce959b4379ecc3d6e262c10146cf
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Nov 6 17:12:11 2024 +0800
[SPARK-50235][SQL] Clean up ColumnVector resource after processing all rows
in ColumnarToRowExec
### What changes were proposed in this pull request?
This patch cleans up ColumnVector resource after processing all rows in
ColumnarToRowExec. This patch only focus on codeben implementation of
ColumnarToRowExec. For non-codegen, it should be relatively rare to use, and
currently no good way has proposed, so leaving it to a follow up.
### Why are the changes needed?
Currently we only assign null to ColumnarBatch object but it doesn't
release the resources hold by the vectors in the batch. For OnHeapColumnVector,
the Java arrays may be automatically collected by JVM, but for
OffHeapColumnVector, the allocated off-heap memory will be leaked.
For custom ColumnVector implementations like Arrow-based, it also possibly
causes issues on memory safety if the underlying buffers are reused across
batches. Because when ColumnarToRowExec begins to fill values for next batch,
the arrays in previous batch are still hold.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48767 from viirya/close_if_not_writable.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
(cherry picked from commit 800faf0abfa368ad0a5ef1e0fa44b74dbaab724e)
Signed-off-by: Kent Yao <[email protected]>
---
.../java/org/apache/spark/sql/vectorized/ColumnVector.java | 12 ++++++++++++
.../java/org/apache/spark/sql/vectorized/ColumnarBatch.java | 10 ++++++++++
.../spark/sql/execution/vectorized/WritableColumnVector.java | 5 +++++
.../main/scala/org/apache/spark/sql/execution/Columnar.scala | 5 +++++
4 files changed, 32 insertions(+)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index a3c58ae02547..7dc2d3814429 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -67,6 +67,18 @@ public abstract class ColumnVector implements AutoCloseable {
@Override
public abstract void close();
+ /**
+ * Cleans up memory for this column vector if it's not writable. The column
vector is not usable
+ * after this.
+ *
+ * If this is a writable column vector, it is a no-op.
+ */
+ public void closeIfNotWritable() {
+ // By default, we just call close() for all column vectors. If a column
vector is writable, it
+ // should override this method and do nothing.
+ close();
+ }
+
/**
* Returns true if this column vector contains any null values.
*/
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
index 9e859e77644a..52e4115af336 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
@@ -45,6 +45,16 @@ public class ColumnarBatch implements AutoCloseable {
}
}
+ /**
+ * Called to close all the columns if they are not writable. This is used to
clean up memory
+ * allocated during columnar processing.
+ */
+ public void closeIfNotWritable() {
+ for (ColumnVector c: columns) {
+ c.closeIfNotWritable();
+ }
+ }
+
/**
* Returns an iterator over the rows in this batch.
*/
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index a8e4aad60c22..0fde85fd454c 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -87,6 +87,11 @@ public abstract class WritableColumnVector extends
ColumnVector {
dictionary = null;
}
+ @Override
+ public void closeIfNotWritable() {
+ // no-op
+ }
+
public void reserveAdditional(int additionalCapacity) {
reserve(elementsAppended + additionalCapacity);
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 3fec13a7f9ba..ea559efc45f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -194,9 +194,14 @@ case class ColumnarToRowExec(child: SparkPlan) extends
ColumnarToRowTransition w
| $shouldStop
| }
| $idx = $numRows;
+ | $batch.closeIfNotWritable();
| $batch = null;
| $nextBatchFuncName();
|}
+ |// clean up resources
+ |if ($batch != null) {
+ | $batch.close();
+ |}
""".stripMargin
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]