Kristin Cowalcijk created SEDONA-186:
----------------------------------------
Summary: `collect` result of a spatial join query with SELECT *
fails with serde error
Key: SEDONA-186
URL: https://issues.apache.org/jira/browse/SEDONA-186
Project: Apache Sedona
Issue Type: Bug
Affects Versions: 1.2.1
Reporter: Kristin Cowalcijk
h2. The Problem
The SQL query reproduced this bug was:
{code:scala}
scala> spark.sql("SELECT * FROM osm_all_nodes_geom osm INNER JOIN
ms_buildings_geom msb ON ST_Contains(msb.geom, osm.geom)").collect
22/11/01 18:41:08 WARN JoinQuery: UseIndex is true, but no index exists. Will
build index on the fly.
java.lang.RuntimeException: Error while decoding:
org.locationtech.jts.io.ParseException: Unknown WKB type 184
createexternalrow(input[0, bigint, true], input[1, binary, true],
newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).deserialize,
input[3, string, true].toString, input[4, string, true].toString, input[5,
binary, true], newInstance(class
org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).deserialize,
StructField(id,LongType,true), StructField(wkb,BinaryType,true),
StructField(geom,GeometryUDT,true), StructField(location,StringType,true),
StructField(quad_key,StringType,true), StructField(wkb,BinaryType,true),
StructField(geom,GeometryUDT,true))
at
org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1047)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:172)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
... 47 elided
Caused by: org.locationtech.jts.io.ParseException: Unknown WKB type 184
at org.locationtech.jts.io.WKBReader.readGeometry(WKBReader.java:282)
at org.locationtech.jts.io.WKBReader.read(WKBReader.java:191)
at org.locationtech.jts.io.WKBReader.read(WKBReader.java:159)
at
org.apache.sedona.sql.utils.GeometrySerializer$.deserialize(GeometrySerializer.scala:49)
at
org.apache.spark.sql.sedona_sql.UDT.GeometryUDT.deserialize(GeometryUDT.scala:42)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:181)
... 65 more
{code}
Where {{osm_all_nodes_geom}} is a DataFrame with 2 columns:
{code:scala}
scala> spark.sql("SELECT * FROM osm_all_nodes_geom").printSchema
root
|-- id: long (nullable = true)
|-- wkb: binary (nullable = true)
|-- geom: geometry (nullable = true)
{code}
{{ms_buildings_geom}} is a DataFrame with 3 columns:
{code:scala}
scala> spark.sql("SELECT * FROM ms_buildings_geom").printSchema
root
|-- location: string (nullable = true)
|-- quad_key: string (nullable = true)
|-- wkb: binary (nullable = true)
|-- geom: geometry (nullable = true)
{code}
h2. Other findings
# This problem only reproduces when running spatial join queries with {{SELECT
*}}. If we SELECT some particular columns, the problem went away.
# {{.show}} runs perfectly fine, while {{collect}} or {{takeAsList}} runs into
this problem.
# This problem cannot reproduced by running broadcast join, such as
{code:scala}
spark.sql("SELECT /*+ BROADCAST(msb) */ * FROM osm_all_nodes_geom osm INNER
JOIN ms_buildings_geom msb ON ST_Contains(msb.geom, osm.geom)").collect
{code}
h2. Cause of this problem
When the join query optimizer replaces a join node with {{RangeJoinExec}} or
{{DistanceJoinExec}} node, left and right sub plans of the join may get
swapped. The output of {{RangeJoinExec}} or {{DistanceJoinExec}} became
{{rightOutput ++ leftOutput}} instead of {{leftOutput ++ rightOutput}}. This
leads to column type inconsistency when deserializing the result rows of the
join query.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)