This is an automated email from the ASF dual-hosted git repository.

kontinuation pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new a03f8a4ad [SEDONA-630] Fix the ENCODER_NOT_FOUND error for 
ST_Union_Aggr outputEncoder (#1545)
a03f8a4ad is described below

commit a03f8a4adb231d36d8eb3f97f28ebf0e5d0f4edb
Author: Feng Zhang <[email protected]>
AuthorDate: Thu Aug 8 19:18:57 2024 -0700

    [SEDONA-630] Fix the ENCODER_NOT_FOUND error for ST_Union_Aggr 
outputEncoder (#1545)
    
    Always construct the encoders on the driver. If encoders are constructed on 
the executor then Spark will complain that it doesn't know how to encode 
Geometry.
---
 .../spark/sql/sedona_sql/expressions/AggregateFunctions.scala | 11 ++++++-----
 .../org/apache/sedona/sql/aggregateFunctionTestScala.scala    |  3 ---
 2 files changed, 6 insertions(+), 8 deletions(-)

diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/AggregateFunctions.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/AggregateFunctions.scala
index 0579967dc..59a310e40 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/AggregateFunctions.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/AggregateFunctions.scala
@@ -55,8 +55,10 @@ trait TraitSTAggregateExec {
 }
 
 class ST_Union_Aggr(bufferSize: Int = 1000)
-    extends Aggregator[Geometry, ListBuffer[Geometry], Geometry]
-    with Serializable {
+    extends Aggregator[Geometry, ListBuffer[Geometry], Geometry] {
+
+  val serde = ExpressionEncoder[Geometry]()
+  val bufferSerde = ExpressionEncoder[ListBuffer[Geometry]]()
 
   override def reduce(buffer: ListBuffer[Geometry], input: Geometry): 
ListBuffer[Geometry] = {
     buffer += input
@@ -86,10 +88,9 @@ class ST_Union_Aggr(bufferSize: Int = 1000)
     OverlayNGRobust.union(reduction.asJava)
   }
 
-  def bufferEncoder: ExpressionEncoder[ListBuffer[Geometry]] =
-    ExpressionEncoder[ListBuffer[Geometry]]()
+  def bufferEncoder: ExpressionEncoder[ListBuffer[Geometry]] = bufferSerde
 
-  def outputEncoder: ExpressionEncoder[Geometry] = 
ExpressionEncoder[Geometry]()
+  def outputEncoder: ExpressionEncoder[Geometry] = serde
 
   override def zero: ListBuffer[Geometry] = ListBuffer.empty
 }
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/aggregateFunctionTestScala.scala
 
b/spark/common/src/test/scala/org/apache/sedona/sql/aggregateFunctionTestScala.scala
index 25de5c8ea..9608cbcbe 100644
--- 
a/spark/common/src/test/scala/org/apache/sedona/sql/aggregateFunctionTestScala.scala
+++ 
b/spark/common/src/test/scala/org/apache/sedona/sql/aggregateFunctionTestScala.scala
@@ -19,10 +19,7 @@
 package org.apache.sedona.sql
 
 import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.expressions.javalang.typed
-import org.apache.spark.sql.sedona_sql.expressions.ST_Union_Aggr
 import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory, 
Polygon}
-import org.locationtech.jts.io.WKTReader
 
 import scala.util.Random
 

Reply via email to