This is an automated email from the ASF dual-hosted git repository.
kontinuation pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new a03f8a4ad [SEDONA-630] Fix the ENCODER_NOT_FOUND error for
ST_Union_Aggr outputEncoder (#1545)
a03f8a4ad is described below
commit a03f8a4adb231d36d8eb3f97f28ebf0e5d0f4edb
Author: Feng Zhang <[email protected]>
AuthorDate: Thu Aug 8 19:18:57 2024 -0700
[SEDONA-630] Fix the ENCODER_NOT_FOUND error for ST_Union_Aggr
outputEncoder (#1545)
Always construct the encoders on the driver. If encoders are constructed on
the executor then Spark will complain that it doesn't know how to encode
Geometry.
---
.../spark/sql/sedona_sql/expressions/AggregateFunctions.scala | 11 ++++++-----
.../org/apache/sedona/sql/aggregateFunctionTestScala.scala | 3 ---
2 files changed, 6 insertions(+), 8 deletions(-)
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/AggregateFunctions.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/AggregateFunctions.scala
index 0579967dc..59a310e40 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/AggregateFunctions.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/AggregateFunctions.scala
@@ -55,8 +55,10 @@ trait TraitSTAggregateExec {
}
class ST_Union_Aggr(bufferSize: Int = 1000)
- extends Aggregator[Geometry, ListBuffer[Geometry], Geometry]
- with Serializable {
+ extends Aggregator[Geometry, ListBuffer[Geometry], Geometry] {
+
+ val serde = ExpressionEncoder[Geometry]()
+ val bufferSerde = ExpressionEncoder[ListBuffer[Geometry]]()
override def reduce(buffer: ListBuffer[Geometry], input: Geometry):
ListBuffer[Geometry] = {
buffer += input
@@ -86,10 +88,9 @@ class ST_Union_Aggr(bufferSize: Int = 1000)
OverlayNGRobust.union(reduction.asJava)
}
- def bufferEncoder: ExpressionEncoder[ListBuffer[Geometry]] =
- ExpressionEncoder[ListBuffer[Geometry]]()
+ def bufferEncoder: ExpressionEncoder[ListBuffer[Geometry]] = bufferSerde
- def outputEncoder: ExpressionEncoder[Geometry] =
ExpressionEncoder[Geometry]()
+ def outputEncoder: ExpressionEncoder[Geometry] = serde
override def zero: ListBuffer[Geometry] = ListBuffer.empty
}
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/aggregateFunctionTestScala.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/aggregateFunctionTestScala.scala
index 25de5c8ea..9608cbcbe 100644
---
a/spark/common/src/test/scala/org/apache/sedona/sql/aggregateFunctionTestScala.scala
+++
b/spark/common/src/test/scala/org/apache/sedona/sql/aggregateFunctionTestScala.scala
@@ -19,10 +19,7 @@
package org.apache.sedona.sql
import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.expressions.javalang.typed
-import org.apache.spark.sql.sedona_sql.expressions.ST_Union_Aggr
import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory,
Polygon}
-import org.locationtech.jts.io.WKTReader
import scala.util.Random