EmmanuelNicolau-TomTom opened a new issue, #2240:
URL: https://github.com/apache/sedona/issues/2240

   ## Expected behaviour 
   
   Writing a dataframe with a geometry column nested within an array to 
Parquet, can be read back without issues.
   
   ## Actual behaviour
   
   ```
   Py4JJavaError: An error occurred while calling o232.collectToPython.
   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage 26.0 
(TID 36) (17e1c21c3c4d executor driver): java.lang.IllegalArgumentException: 
Spark type: StructType(StructField(geometry,BinaryType,true)) doesn't match the 
type: 
StructType(StructField(geometry,org.apache.spark.sql.sedona_sql.UDT.GeometryUDT@1114c728,true))
 in column vector
           at 
org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:70)
           at 
org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:117)
           at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:286)
           at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:306)
           at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:293)
           at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:217)
           at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
           at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
           at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
 Source)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
           at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
           at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
           at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
           at org.apache.spark.scheduler.Task.run(Task.scala:141)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
           at java.base/java.lang.Thread.run(Thread.java:840)
   
   Driver stacktrace:
           at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
           at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
           at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
           at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
           at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
           at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
           at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
           at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
           at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
           at scala.Option.foreach(Option.scala:407)
           at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
           at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
           at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
           at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
           at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
           at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
           at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
           at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
           at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
           at 
org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4160)
           at 
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
           at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
           at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
           at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
           at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4157)
           at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
           at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:569)
           at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
           at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
           at py4j.Gateway.invoke(Gateway.java:282)
           at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
           at py4j.commands.CallCommand.execute(CallCommand.java:79)
           at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
           at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
           at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: java.lang.IllegalArgumentException: Spark type: 
StructType(StructField(geometry,BinaryType,true)) doesn't match the type: 
StructType(StructField(geometry,org.apache.spark.sql.sedona_sql.UDT.GeometryUDT@1114c728,true))
 in column vector
           at 
org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:70)
           at 
org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:117)
           at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:286)
           at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:306)
           at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:293)
           at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:217)
           at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
           at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
           at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
 Source)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
           at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
           at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
           at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
           at org.apache.spark.scheduler.Task.run(Task.scala:141)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
           ... 1 more
   ```
   
   ## Steps to reproduce
   
   The following minimal script reproduces the issue:
   
   ```python
   # pip install apache-sedona==1.7.2 shapely pyproj pyspark==3.5.0
   
   from sedona.spark import SedonaContext
   from pyspark.sql.functions import array, struct, expr
   
   config = (
       SedonaContext.builder()
       .config(
           "spark.jars.packages",
           "org.apache.sedona:sedona-spark-3.5_2.12:1.7.2,"
           "org.datasyslab:geotools-wrapper:1.7.2-28.5",
       )
       .config(
           "spark.jars.repositories",
           "https://artifacts.unidata.ucar.edu/repository/unidata-all";,
       )
       .getOrCreate()
   )
   
   spark = SedonaContext.create(config)
   
   df = spark.createDataFrame([{'id': 'something'}]).withColumn("nested", 
array(struct(expr("ST_Point(0, 0)").alias("geometry"))))
   df.write.mode("overwrite").parquet("/tmp/test")
   
   print(spark.read.parquet("/tmp/test").collect())
   ```
   
   ## Notes
   
   * The issue only happens with that particular nested structure. e.g. nesting 
without the `array` works just fine.
   * The issue only happens when using pyspark 3.5. However, with pyspark 3.3 
(and python 3.10) it works fine.
   * When adding geometry column at the root and then using GeoParquet as the 
format, it works fine.
   * In Databriks 15.4 with a similar setup, also works.
   
   ## Environment
   
   * Python 3.11.13
   * Java version = 17


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to