This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b974ca75896 [SPARK-50463][SQL] Fix `ConstantColumnVector` with 
Columnar to Row conversion
7b974ca75896 is described below

commit 7b974ca758961668a26a1d0c60c91614dac38742
Author: Richard Chen <[email protected]>
AuthorDate: Tue Dec 3 08:36:59 2024 -0800

    [SPARK-50463][SQL] Fix `ConstantColumnVector` with Columnar to Row 
conversion
    
    ### What changes were proposed in this pull request?
    
    
https://github.com/apache/spark/commit/800faf0abfa368ad0a5ef1e0fa44b74dbaab724e 
frees column vector resources between batches in columnar to row conversion. 
However, like `WritableColumnVector`, `ConstantColumnVector` should not free 
resources between batches because the same data is used across batches
    
    ### Why are the changes needed?
    
    Without this change, ConstantColumnVectors with string values, for example, 
will fail if used with column->row conversion. For instance, reading a parquet 
table partitioned by a string column with multiple batches.
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    added UT that failed before and now passes
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #49021 from richardc-db/col_to_row_const_col_vec_fix.
    
    Authored-by: Richard Chen <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/vectorized/ColumnVector.java    | 12 ++++++------
 .../apache/spark/sql/vectorized/ColumnarBatch.java   |  8 ++++----
 .../execution/vectorized/ConstantColumnVector.java   |  5 +++++
 .../execution/vectorized/WritableColumnVector.java   |  2 +-
 .../org/apache/spark/sql/execution/Columnar.scala    |  2 +-
 .../datasources/parquet/ParquetQuerySuite.scala      | 20 ++++++++++++++++++++
 6 files changed, 37 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index bfb1833b731a..54b62c00283f 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -69,14 +69,14 @@ public abstract class ColumnVector implements AutoCloseable 
{
   public abstract void close();
 
   /**
-   * Cleans up memory for this column vector if it's not writable. The column 
vector is not usable
-   * after this.
+   * Cleans up memory for this column vector if it's resources are freeable 
between batches.
+   * The column vector is not usable after this.
    *
-   * If this is a writable column vector, it is a no-op.
+   * If this is a writable column vector or constant column vector, it is a 
no-op.
    */
-  public void closeIfNotWritable() {
-    // By default, we just call close() for all column vectors. If a column 
vector is writable, it
-    // should override this method and do nothing.
+  public void closeIfFreeable() {
+    // By default, we just call close() for all column vectors. If a column 
vector is writable or
+    // constant, it should override this method and do nothing.
     close();
   }
 
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
index 52e4115af336..7ef570a21229 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
@@ -46,12 +46,12 @@ public class ColumnarBatch implements AutoCloseable {
   }
 
   /**
-   * Called to close all the columns if they are not writable. This is used to 
clean up memory
-   * allocated during columnar processing.
+   * Called to close all the columns if their resources are freeable between 
batches.
+   * This is used to clean up memory allocated during columnar processing.
    */
-  public void closeIfNotWritable() {
+  public void closeIfFreeable() {
     for (ColumnVector c: columns) {
-      c.closeIfNotWritable();
+      c.closeIfFreeable();
     }
   }
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
index 8b24973ad3d8..cd2a82169885 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
@@ -77,6 +77,11 @@ public class ConstantColumnVector extends ColumnVector {
     }
   }
 
+  public void closeIfFreeable() {
+    // no-op: `ConstantColumnVector`s reuse the data backing its value across 
multiple batches and
+    // are freed at the end of execution in `close`.
+  }
+
   @Override
   public void close() {
     stringData = null;
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 696e20525cda..fc465e73006b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -97,7 +97,7 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   }
 
   @Override
-  public void closeIfNotWritable() {
+  public void closeIfFreeable() {
     // no-op
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 64163da50e13..a67648f24b4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -194,7 +194,7 @@ case class ColumnarToRowExec(child: SparkPlan) extends 
ColumnarToRowTransition w
        |    $shouldStop
        |  }
        |  $idx = $numRows;
-       |  $batch.closeIfNotWritable();
+       |  $batch.closeIfFreeable();
        |  $batch = null;
        |  $nextBatchFuncName();
        |}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 22a02447e720..bba71f1c48de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -473,6 +473,26 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
     }
   }
 
+  test("SPARK-50463: Partition values can be read over multiple batches") {
+    withTempDir { dir =>
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> "1") {
+        val path = dir.getAbsolutePath
+        spark.range(0, 5)
+          .selectExpr("concat(cast(id % 2 as string), 'a') as partCol", "id")
+          .write
+          .format("parquet")
+          .mode("overwrite")
+          .partitionBy("partCol").save(path)
+        val df = spark.read.format("parquet").load(path).selectExpr("partCol")
+        val expected = spark.range(0, 5)
+          .selectExpr("concat(cast(id % 2 as string), 'a') as partCol")
+          .collect()
+
+        checkAnswer(df, expected)
+      }
+    }
+  }
+
   test("SPARK-10301 requested schema clipping - same schema") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to