[ 
https://issues.apache.org/jira/browse/SPARK-44079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk reassigned SPARK-44079:
--------------------------------

    Assignee: Jia Fan

> Json reader crashes when a different schema is present
> ------------------------------------------------------
>
>                 Key: SPARK-44079
>                 URL: https://issues.apache.org/jira/browse/SPARK-44079
>             Project: Spark
>          Issue Type: Bug
>          Components: python
>    Affects Versions: 3.4.0
>            Reporter: charlotte van der scheun
>            Assignee: Jia Fan
>            Priority: Major
>
> When using pyspark 3.4, we noticed that when reading a json file with a 
> corrupted record the reader crashes. In pyspark 3.3 this worked fine.
> {*}Code{*}:
> {code:java}
> from pyspark.sql.types import StructType, StructField, IntegerType, StringType
> import json
> data = """[{"a": "incorrect", "b": "correct"}]"""
> schema = StructType([StructField('a', IntegerType(), True), StructField('b', 
> StringType(), True), StructField('_corrupt_record', StringType(), True)])
> spark.read.option("mode", 
> "PERMISSIVE").option("multiline","true").schema(schema).json(spark.sparkContext.parallelize([data])).show(truncate=False){code}
> *Used packages:*
>  * Pyspark==3.4.0
>  * python==3.10.0
>  * delta-spark==2.4.0
>  
> spark_jars=(
>   "org.apache.spark:spark-avro_2.12:3.4.0"
>   ",io.delta:delta-core_2.12:2.4.0"
>   ",com.databricks:spark-xml_2.12:0.16.0"
> )
>  
> {*}Expected behaviour{*}:
> |a|b|_corrupt_record|
> |null|null|[\\{"a": "incorrect", "b": "correct"}]|
>  
> {*}Actual behaviour{*}:
> {code:java}
>  
> *** py4j.protocol.Py4JJavaError: An error occurred while calling 
> o104.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 
> in stage 2.0 failed 1 times, most recent failure: Lost task 4.0 in stage 2.0 
> (TID 9) (charlottesmbp2.home executor driver): 
> java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
>         at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201)
>         at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
>         at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.get(rows.scala:37)
>         at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.get$(rows.scala:37)
>         at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.get(rows.scala:195)
>         at 
> org.apache.spark.sql.catalyst.util.FailureSafeParser.$anonfun$toResultRow$2(FailureSafeParser.scala:47)
>         at scala.Option.map(Option.scala:230)
>         at 
> org.apache.spark.sql.catalyst.util.FailureSafeParser.$anonfun$toResultRow$1(FailureSafeParser.scala:47)
>         at 
> org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:64)
>         at 
> org.apache.spark.sql.DataFrameReader.$anonfun$json$10(DataFrameReader.scala:431)
>         at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
>         at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
>         at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
>         at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>         at org.apache.spark.scheduler.Task.run(Task.scala:139)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>         at java.base/java.lang.Thread.run(Thread.java:1589)
> Driver stacktrace:
>         at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
>         at scala.Option.foreach(Option.scala:407)
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>         at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
>         at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
>         at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
>         at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
>         at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
>         at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
>         at 
> org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
>         at 
> org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
>         at 
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
>         at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
>         at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
>         at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
>         at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:578)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>         at py4j.Gateway.invoke(Gateway.java:282)
>         at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at 
> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
>         at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
>         at java.base/java.lang.Thread.run(Thread.java:1589)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds 
> for length 1
>         at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201)
>         at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
>         at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.get(rows.scala:37)
>         at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.get$(rows.scala:37)
>         at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.get(rows.scala:195)
>         at 
> org.apache.spark.sql.catalyst.util.FailureSafeParser.$anonfun$toResultRow$2(FailureSafeParser.scala:47)
>         at scala.Option.map(Option.scala:230)
>         at 
> org.apache.spark.sql.catalyst.util.FailureSafeParser.$anonfun$toResultRow$1(FailureSafeParser.scala:47)
>         at 
> org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:64)
>         at 
> org.apache.spark.sql.DataFrameReader.$anonfun$json$10(DataFrameReader.scala:431)
>         at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
>         at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
>         at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
>         at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>         at org.apache.spark.scheduler.Task.run(Task.scala:139)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>         ... 1 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to