This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7b974ca75896 [SPARK-50463][SQL] Fix `ConstantColumnVector` with
Columnar to Row conversion
7b974ca75896 is described below
commit 7b974ca758961668a26a1d0c60c91614dac38742
Author: Richard Chen <[email protected]>
AuthorDate: Tue Dec 3 08:36:59 2024 -0800
[SPARK-50463][SQL] Fix `ConstantColumnVector` with Columnar to Row
conversion
### What changes were proposed in this pull request?
https://github.com/apache/spark/commit/800faf0abfa368ad0a5ef1e0fa44b74dbaab724e
frees column vector resources between batches in columnar to row conversion.
However, like `WritableColumnVector`, `ConstantColumnVector` should not free
resources between batches because the same data is used across batches
### Why are the changes needed?
Without this change, ConstantColumnVectors with string values, for example,
will fail if used with column->row conversion. For instance, reading a parquet
table partitioned by a string column with multiple batches.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
added UT that failed before and now passes
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #49021 from richardc-db/col_to_row_const_col_vec_fix.
Authored-by: Richard Chen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/vectorized/ColumnVector.java | 12 ++++++------
.../apache/spark/sql/vectorized/ColumnarBatch.java | 8 ++++----
.../execution/vectorized/ConstantColumnVector.java | 5 +++++
.../execution/vectorized/WritableColumnVector.java | 2 +-
.../org/apache/spark/sql/execution/Columnar.scala | 2 +-
.../datasources/parquet/ParquetQuerySuite.scala | 20 ++++++++++++++++++++
6 files changed, 37 insertions(+), 12 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index bfb1833b731a..54b62c00283f 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -69,14 +69,14 @@ public abstract class ColumnVector implements AutoCloseable
{
public abstract void close();
/**
- * Cleans up memory for this column vector if it's not writable. The column
vector is not usable
- * after this.
+ * Cleans up memory for this column vector if it's resources are freeable
between batches.
+ * The column vector is not usable after this.
*
- * If this is a writable column vector, it is a no-op.
+ * If this is a writable column vector or constant column vector, it is a
no-op.
*/
- public void closeIfNotWritable() {
- // By default, we just call close() for all column vectors. If a column
vector is writable, it
- // should override this method and do nothing.
+ public void closeIfFreeable() {
+ // By default, we just call close() for all column vectors. If a column
vector is writable or
+ // constant, it should override this method and do nothing.
close();
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
index 52e4115af336..7ef570a21229 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
@@ -46,12 +46,12 @@ public class ColumnarBatch implements AutoCloseable {
}
/**
- * Called to close all the columns if they are not writable. This is used to
clean up memory
- * allocated during columnar processing.
+ * Called to close all the columns if their resources are freeable between
batches.
+ * This is used to clean up memory allocated during columnar processing.
*/
- public void closeIfNotWritable() {
+ public void closeIfFreeable() {
for (ColumnVector c: columns) {
- c.closeIfNotWritable();
+ c.closeIfFreeable();
}
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
index 8b24973ad3d8..cd2a82169885 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
@@ -77,6 +77,11 @@ public class ConstantColumnVector extends ColumnVector {
}
}
+ public void closeIfFreeable() {
+ // no-op: `ConstantColumnVector`s reuse the data backing its value across
multiple batches and
+ // are freed at the end of execution in `close`.
+ }
+
@Override
public void close() {
stringData = null;
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 696e20525cda..fc465e73006b 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -97,7 +97,7 @@ public abstract class WritableColumnVector extends
ColumnVector {
}
@Override
- public void closeIfNotWritable() {
+ public void closeIfFreeable() {
// no-op
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 64163da50e13..a67648f24b4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -194,7 +194,7 @@ case class ColumnarToRowExec(child: SparkPlan) extends
ColumnarToRowTransition w
| $shouldStop
| }
| $idx = $numRows;
- | $batch.closeIfNotWritable();
+ | $batch.closeIfFreeable();
| $batch = null;
| $nextBatchFuncName();
|}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 22a02447e720..bba71f1c48de 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -473,6 +473,26 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
}
}
+ test("SPARK-50463: Partition values can be read over multiple batches") {
+ withTempDir { dir =>
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> "1") {
+ val path = dir.getAbsolutePath
+ spark.range(0, 5)
+ .selectExpr("concat(cast(id % 2 as string), 'a') as partCol", "id")
+ .write
+ .format("parquet")
+ .mode("overwrite")
+ .partitionBy("partCol").save(path)
+ val df = spark.read.format("parquet").load(path).selectExpr("partCol")
+ val expected = spark.range(0, 5)
+ .selectExpr("concat(cast(id % 2 as string), 'a') as partCol")
+ .collect()
+
+ checkAnswer(df, expected)
+ }
+ }
+ }
+
test("SPARK-10301 requested schema clipping - same schema") {
withTempPath { dir =>
val path = dir.getCanonicalPath
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]