This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 917baeaa91a1 [SPARK-55328][SQL][PYTHON] Reuse PythonArrowInput.codec 
in GroupedPythonArrowInput
917baeaa91a1 is described below

commit 917baeaa91a107a842bb52b710e88e6987b3081e
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Feb 3 15:19:59 2026 +0900

    [SPARK-55328][SQL][PYTHON] Reuse PythonArrowInput.codec in 
GroupedPythonArrowInput
    
    ### What changes were proposed in this pull request?
    Reuse PythonArrowInput.codec in GroupedPythonArrowInput
    
    ### Why are the changes needed?
    code clean up
    
    ### Does this PR introduce _any_ user-facing change?
    no, minor internal change
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI no
    
    Closes #54109 from zhengruifeng/simplify_codec.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/execution/python/PythonArrowInput.scala     | 21 +--------------------
 1 file changed, 1 insertion(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
index a659cb599b2a..58a48b1815e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala
@@ -266,25 +266,6 @@ private[python] object BatchedPythonArrowInput {
  */
 private[python] trait GroupedPythonArrowInput { self: 
RowInputArrowPythonRunner =>
 
-  // Helper method to create VectorUnloader with compression for grouped 
operations
-  private def createUnloaderForGroup(root: VectorSchemaRoot): VectorUnloader = 
{
-    val codec = SQLConf.get.arrowCompressionCodec match {
-      case "none" => NoCompressionCodec.INSTANCE
-      case "zstd" =>
-        val compressionLevel = SQLConf.get.arrowZstdCompressionLevel
-        val factory = CompressionCodec.Factory.INSTANCE
-        val codecType = new 
ZstdCompressionCodec(compressionLevel).getCodecType()
-        factory.createCodec(codecType)
-      case "lz4" =>
-        val factory = CompressionCodec.Factory.INSTANCE
-        val codecType = new Lz4CompressionCodec().getCodecType()
-        factory.createCodec(codecType)
-      case other =>
-        throw SparkException.internalError(
-          s"Unsupported Arrow compression codec: $other. Supported values: 
none, zstd, lz4")
-    }
-    new VectorUnloader(root, true, codec, true)
-  }
   protected override def newWriter(
       env: SparkEnv,
       worker: PythonWorker,
@@ -309,7 +290,7 @@ private[python] trait GroupedPythonArrowInput { self: 
RowInputArrowPythonRunner
               schema, timeZoneId, pythonExec,
               errorOnDuplicatedFieldNames, largeVarTypes, dataOut, context)
             // Set the unloader with compression after creating the writer
-            writer.unloader = createUnloaderForGroup(writer.root)
+            writer.unloader = new VectorUnloader(writer.root, true, 
self.codec, true)
             nextBatchStart = inputIterator.next()
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to