This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 917baeaa91a1 [SPARK-55328][SQL][PYTHON] Reuse PythonArrowInput.codec
in GroupedPythonArrowInput
917baeaa91a1 is described below
commit 917baeaa91a107a842bb52b710e88e6987b3081e
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Feb 3 15:19:59 2026 +0900
[SPARK-55328][SQL][PYTHON] Reuse PythonArrowInput.codec in
GroupedPythonArrowInput
### What changes were proposed in this pull request?
Reuse PythonArrowInput.codec in GroupedPythonArrowInput
### Why are the changes needed?
code clean up
### Does this PR introduce _any_ user-facing change?
no, minor internal change
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI no
Closes #54109 from zhengruifeng/simplify_codec.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/execution/python/PythonArrowInput.scala | 21 +--------------------
1 file changed, 1 insertion(+), 20 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
index a659cb599b2a..58a48b1815e1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
@@ -266,25 +266,6 @@ private[python] object BatchedPythonArrowInput {
*/
private[python] trait GroupedPythonArrowInput { self:
RowInputArrowPythonRunner =>
- // Helper method to create VectorUnloader with compression for grouped
operations
- private def createUnloaderForGroup(root: VectorSchemaRoot): VectorUnloader =
{
- val codec = SQLConf.get.arrowCompressionCodec match {
- case "none" => NoCompressionCodec.INSTANCE
- case "zstd" =>
- val compressionLevel = SQLConf.get.arrowZstdCompressionLevel
- val factory = CompressionCodec.Factory.INSTANCE
- val codecType = new
ZstdCompressionCodec(compressionLevel).getCodecType()
- factory.createCodec(codecType)
- case "lz4" =>
- val factory = CompressionCodec.Factory.INSTANCE
- val codecType = new Lz4CompressionCodec().getCodecType()
- factory.createCodec(codecType)
- case other =>
- throw SparkException.internalError(
- s"Unsupported Arrow compression codec: $other. Supported values:
none, zstd, lz4")
- }
- new VectorUnloader(root, true, codec, true)
- }
protected override def newWriter(
env: SparkEnv,
worker: PythonWorker,
@@ -309,7 +290,7 @@ private[python] trait GroupedPythonArrowInput { self:
RowInputArrowPythonRunner
schema, timeZoneId, pythonExec,
errorOnDuplicatedFieldNames, largeVarTypes, dataOut, context)
// Set the unloader with compression after creating the writer
- writer.unloader = createUnloaderForGroup(writer.root)
+ writer.unloader = new VectorUnloader(writer.root, true,
self.codec, true)
nextBatchStart = inputIterator.next()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]