This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch arrow-worker
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 0770a6d3a73d54c0fde22c039331d881b04a061a
Author: pawelkocinski <[email protected]>
AuthorDate: Tue Dec 30 18:38:24 2025 +0100

    add code so far
---
 .../sql/execution/python/EvalPythonExec.scala      | 102 ---------------------
 .../execution/python/SedonaArrowPythonRunner.scala |  10 +-
 .../execution/python/SedonaBasePythonRunner.scala  |  12 +--
 .../execution/python/SedonaPythonArrowOutput.scala |  37 +++++++-
 .../org/apache/sedona/sql/TestBaseScala.scala      |   3 +-
 .../org/apache/spark/sql/udf/StrategySuite.scala   |   6 +-
 .../apache/spark/sql/udf/TestScalarPandasUDF.scala |   2 +-
 7 files changed, 52 insertions(+), 120 deletions(-)

diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
deleted file mode 100644
index 11cc8c121f..0000000000
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.apache.spark.sql.execution.python
-
-import org.apache.sedona.common.geometrySerde.GeometrySerde
-import org.apache.sedona.sql.utils.GeometrySerializer
-import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
-import org.apache.spark.api.python.ChainedPythonFunctions
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, JoinedRow, MutableProjection, PythonUDF, UnsafeProjection, 
UnsafeRow}
-import org.apache.spark.sql.execution.UnaryExecNode
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
-import org.apache.spark.util.Utils
-
-import java.io.File
-import scala.collection.mutable.ArrayBuffer
-
-trait EvalPythonExec extends UnaryExecNode {
-  def udfs: Seq[PythonUDF]
-
-  def resultAttrs: Seq[Attribute]
-
-  override def output: Seq[Attribute] = child.output ++ resultAttrs
-
-  override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)
-
-  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
-    udf.children match {
-      case Seq(u: PythonUDF) =>
-        val (chained, children) = collectFunctions(u)
-        (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
-      case children =>
-        // There should not be any other UDFs, or the children can't be 
evaluated directly.
-        assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
-        (ChainedPythonFunctions(Seq(udf.func)), udf.children)
-    }
-  }
-
-  protected def evaluate(
-                          funcs: Seq[ChainedPythonFunctions],
-                          argOffsets: Array[Array[Int]],
-                          iter: Iterator[InternalRow],
-                          schema: StructType,
-                          context: TaskContext): Iterator[InternalRow]
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    val inputRDD = child.execute().map(_.copy())
-
-    inputRDD.mapPartitions { iter =>
-      val context = TaskContext.get()
-      val contextAwareIterator = new ContextAwareIterator(context, iter)
-
-      // The queue used to buffer input rows so we can drain it to
-      // combine input with output from Python.
-      val queue = HybridRowQueue(context.taskMemoryManager(),
-        new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
-//      context.addTaskCompletionListener[Unit] { ctx =>
-//        queue.close()
-//      }
-
-      val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip
-
-      // flatten all the arguments
-      val allInputs = new ArrayBuffer[Expression]
-      val dataTypes = new ArrayBuffer[DataType]
-      val argOffsets = inputs.map { input =>
-        input.map { e =>
-          if (allInputs.exists(_.semanticEquals(e))) {
-            allInputs.indexWhere(_.semanticEquals(e))
-          } else {
-            allInputs += e
-            dataTypes += e.dataType
-            allInputs.length - 1
-          }
-        }.toArray
-      }.toArray
-      val projection = MutableProjection.create(allInputs.toSeq, child.output)
-      projection.initialize(context.partitionId())
-      val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
-        StructField(s"_$i", dt)
-      }.toArray)
-
-      // Add rows to queue to join later with the result.
-      val projectedRowIter = contextAwareIterator.map { inputRow =>
-        queue.add(inputRow.asInstanceOf[UnsafeRow])
-        val proj = projection(inputRow)
-        proj
-      }
-
-      val materializedResult = projectedRowIter.toSeq
-
-      val outputRowIterator = evaluate(
-        pyFuncs, argOffsets, materializedResult.toIterator, schema, context)
-
-      val joined = new JoinedRow
-      val resultProj = UnsafeProjection.create(output, output)
-
-      outputRowIterator.map { outputRow =>
-        resultProj(joined(queue.remove(), outputRow))
-      }
-    }
-  }
-}
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
index e6f7f6ddf9..3bb93fe62e 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
@@ -53,9 +53,9 @@ class SedonaArrowPythonRunner(
 
   override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
 
-//  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
-//  require(
-//    bufferSize >= 4,
-//    "Pandas execution requires more than 4 bytes. Please set higher buffer. 
" +
-//      s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+    bufferSize >= 4,
+    "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+      s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
 }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
index d7af4fe771..f1b55e4d24 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
@@ -108,13 +108,13 @@ private[spark] abstract class SedonaBasePythonRunner[IN, 
OUT](
     }
 
     envVars.put("SPARK_JOB_ARTIFACT_UUID", 
jobArtifactUUID.getOrElse("default"))
-//
-//    val (worker: Socket, pid: Option[Int]) = {
-//      WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap)
-//    }
 
-    val (worker: Socket, pid: Option[Int]) = env.createPythonWorker(
-      pythonExec, envVars.asScala.toMap)
+    val (worker: Socket, pid: Option[Int]) = {
+      WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap)
+    }
+
+//    val (worker: Socket, pid: Option[Int]) = env.createPythonWorker(
+//      pythonExec, envVars.asScala.toMap)
 
 //    println("Sedona worker port: " + worker.getPort())
     // Whether is the worker released into idle pool or closed. When any codes 
try to release or
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
index bf32ab2764..20c4859eb2 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
@@ -230,6 +230,8 @@ private[python] trait SedonaPythonArrowOutput[OUT <: 
AnyRef] { self: BasePythonR
       private var root: VectorSchemaRoot = _
       private var schema: StructType = _
       private var vectors: Array[ColumnVector] = _
+      private var eos = false
+      private var nextObj: OUT = _
 
       context.addTaskCompletionListener[Unit] { _ =>
         if (reader != null) {
@@ -240,11 +242,40 @@ private[python] trait SedonaPythonArrowOutput[OUT <: 
AnyRef] { self: BasePythonR
 
       private var batchLoaded = true
 
+      def handleEndOfDataSectionSedona (): Unit = {
+        if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
+
+        }
+
+        eos = true
+      }
+
       protected override def handleEndOfDataSection(): Unit = {
         handleMetadataAfterExec(stream)
-        super.handleEndOfDataSection()
+        handleEndOfDataSectionSedona()
+      }
+
+      override def hasNext: Boolean = nextObj != null || {
+        if (!eos) {
+          nextObj = read()
+          hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): OUT = {
+        if (hasNext) {
+          val obj = nextObj
+          nextObj = null.asInstanceOf[OUT]
+          obj
+        } else {
+          Iterator.empty.next()
+        }
       }
 
+
+
       protected override def read(): OUT = {
         if (writerThread.exception.isDefined) {
           throw writerThread.exception.get
@@ -268,7 +299,9 @@ private[python] trait SedonaPythonArrowOutput[OUT <: 
AnyRef] { self: BasePythonR
               read()
             }
           } else {
-            stream.readInt() match {
+            val specialSign = stream.readInt()
+
+            specialSign match {
               case SpecialLengths.START_ARROW_STREAM =>
                 reader = new ArrowStreamReader(stream, allocator)
                 root = reader.getVectorSchemaRoot()
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 4eb48e4ca7..32d2f06fc8 100644
--- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -38,9 +38,10 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
 
   val keyParserExtension = "spark.sedona.enableParserExtension"
   val warehouseLocation = System.getProperty("user.dir") + "/target/"
+//  4425302.491982245
   val sparkSession = SedonaContext
     .builder()
-    .master("local[1]")
+    .master("local[*]")
     .appName("sedonasqlScalaTest")
     .config("spark.sql.warehouse.dir", warehouseLocation)
     // We need to be explicit about broadcasting in tests.
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
index fa2c4ef5b0..cdde195413 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
@@ -94,9 +94,8 @@ class StrategySuite extends TestBaseScala with Matchers {
 //      
.save("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_2")
     val df = spark.read
       .format("geoparquet")
-      
.load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_2")
+      
.load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings")
       .select("geometry")
-      .limit(1000)
 
     df.cache()
     df.count()
@@ -119,7 +118,8 @@ class StrategySuite extends TestBaseScala with Matchers {
 //        nonGeometryVectorizedUDF(col("id")).alias("id_increased"),
       )
 
-    dfVectorized.show()
+//    dfVectorized.show()
+    dfVectorized.selectExpr("ST_X(ST_Centroid(geom)) AS 
x").selectExpr("sum(x)").show()
 //    dfVectorized.selectExpr("ST_X(ST_Centroid(geom)) AS 
x").selectExpr("sum(x)").show()
 //    val processingContext = df.queryExecution.explainString(mode = 
ExplainMode.fromString("extended"))
 
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
index b63237cae6..e941adcff4 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
@@ -237,6 +237,6 @@ object ScalarUDF {
       accumulator = null),
     dataType = GeometryUDT,
     pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_DB_UDF,
-    udfDeterministic = false)
+    udfDeterministic = true)
 
 }

Reply via email to