This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new d39f5ab99f67 [SPARK-50235][SQL] Clean up ColumnVector resource after 
processing all rows in ColumnarToRowExec
d39f5ab99f67 is described below

commit d39f5ab99f67ce959b4379ecc3d6e262c10146cf
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Nov 6 17:12:11 2024 +0800

    [SPARK-50235][SQL] Clean up ColumnVector resource after processing all rows 
in ColumnarToRowExec
    
    ### What changes were proposed in this pull request?
    
    This patch cleans up ColumnVector resource after processing all rows in 
ColumnarToRowExec. This patch only focus on codeben implementation of 
ColumnarToRowExec. For non-codegen, it should be relatively rare to use, and 
currently no good way has proposed, so leaving it to a follow up.
    
    ### Why are the changes needed?
    
    Currently we only assign null to ColumnarBatch object but it doesn't 
release the resources hold by the vectors in the batch. For OnHeapColumnVector, 
the Java arrays may be automatically collected by JVM, but for 
OffHeapColumnVector, the allocated off-heap memory will be leaked.
    
    For custom ColumnVector implementations like Arrow-based, it also possibly 
causes issues on memory safety if the underlying buffers are reused across 
batches. Because when ColumnarToRowExec begins to fill values for next batch, 
the arrays in previous batch are still hold.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48767 from viirya/close_if_not_writable.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
    (cherry picked from commit 800faf0abfa368ad0a5ef1e0fa44b74dbaab724e)
    Signed-off-by: Kent Yao <[email protected]>
---
 .../java/org/apache/spark/sql/vectorized/ColumnVector.java   | 12 ++++++++++++
 .../java/org/apache/spark/sql/vectorized/ColumnarBatch.java  | 10 ++++++++++
 .../spark/sql/execution/vectorized/WritableColumnVector.java |  5 +++++
 .../main/scala/org/apache/spark/sql/execution/Columnar.scala |  5 +++++
 4 files changed, 32 insertions(+)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index a3c58ae02547..7dc2d3814429 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -67,6 +67,18 @@ public abstract class ColumnVector implements AutoCloseable {
   @Override
   public abstract void close();
 
+  /**
+   * Cleans up memory for this column vector if it's not writable. The column 
vector is not usable
+   * after this.
+   *
+   * If this is a writable column vector, it is a no-op.
+   */
+  public void closeIfNotWritable() {
+    // By default, we just call close() for all column vectors. If a column 
vector is writable, it
+    // should override this method and do nothing.
+    close();
+  }
+
   /**
    * Returns true if this column vector contains any null values.
    */
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
index 9e859e77644a..52e4115af336 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
@@ -45,6 +45,16 @@ public class ColumnarBatch implements AutoCloseable {
     }
   }
 
+  /**
+   * Called to close all the columns if they are not writable. This is used to 
clean up memory
+   * allocated during columnar processing.
+   */
+  public void closeIfNotWritable() {
+    for (ColumnVector c: columns) {
+      c.closeIfNotWritable();
+    }
+  }
+
   /**
    * Returns an iterator over the rows in this batch.
    */
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index a8e4aad60c22..0fde85fd454c 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -87,6 +87,11 @@ public abstract class WritableColumnVector extends 
ColumnVector {
     dictionary = null;
   }
 
+  @Override
+  public void closeIfNotWritable() {
+    // no-op
+  }
+
   public void reserveAdditional(int additionalCapacity) {
     reserve(elementsAppended + additionalCapacity);
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 3fec13a7f9ba..ea559efc45f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -194,9 +194,14 @@ case class ColumnarToRowExec(child: SparkPlan) extends 
ColumnarToRowTransition w
        |    $shouldStop
        |  }
        |  $idx = $numRows;
+       |  $batch.closeIfNotWritable();
        |  $batch = null;
        |  $nextBatchFuncName();
        |}
+       |// clean up resources
+       |if ($batch != null) {
+       |  $batch.close();
+       |}
      """.stripMargin
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to