[ 
https://issues.apache.org/jira/browse/SPARK-35287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17338291#comment-17338291
 ] 

Hyukjin Kwon commented on SPARK-35287:
--------------------------------------

cc [~aokolnychyi] FYI

> RemoveRedundantProjects removes non-redundant projects
> ------------------------------------------------------
>
>                 Key: SPARK-35287
>                 URL: https://issues.apache.org/jira/browse/SPARK-35287
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: Chungmin
>            Priority: Major
>
> RemoveRedundantProjects erroneously removes non-redundant projects which are 
> required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There 
> is a code for this case, but it only looks at the child. The bug occurs when 
> DataSourceV2ScanExec is not a child of the project, but a descendant. The 
> method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to 
> account for descendants too.
> The original scenario requires Iceberg to reproduce the issue. In theory, it 
> should be able to reproduce the bug with Spark SQL only, and someone more 
> knowledgeable with Spark SQL should be able to make such a scenario. The 
> following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): 
> {code:java}
> import scala.collection.JavaConverters._
> import org.apache.iceberg.{PartitionSpec, TableProperties}
> import org.apache.iceberg.hadoop.HadoopTables
> import org.apache.iceberg.spark.SparkSchemaUtil
> import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
> import org.apache.spark.sql.internal.SQLConf
> class RemoveRedundantProjectsTest extends QueryTest {
>   override val spark: SparkSession = SparkSession
>     .builder()
>     .master("local[4]")
>     .config("spark.driver.bindAddress", "127.0.0.1")
>     .appName(suiteName)
>     .getOrCreate()
>   test("RemoveRedundantProjects removes non-redundant projects") {
>     withSQLConf(
>       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
>       SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
>       SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
>       withTempDir { dir =>
>         val path = dir.getCanonicalPath
>         val data = spark.range(3).toDF
>         val table = new HadoopTables().create(
>           SparkSchemaUtil.convert(data.schema),
>           PartitionSpec.unpartitioned(),
>           Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
>           path)
>         data.write.format("iceberg").mode("overwrite").save(path)
>         table.refresh()
>         val df = spark.read.format("iceberg").load(path)
>         val dfX = df.as("x")
>         val dfY = df.as("y")
>         val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
>         join.explain("extended")
>         assert(join.count() == 2)
>       }
>     }
>   }
> }
> {code}
> Stack trace:
> {noformat}
> [info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
> [info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in 
> stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor 
> driver): java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast 
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> [info]  at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
> [info]  at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
> [info]  at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> [info]  at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> [info]  at org.apache.spark.scheduler.Task.run(Task.scala:131)
> [info]  at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
> [info]  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> [info]  at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
> [info]  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [info]  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [info]  at java.lang.Thread.run(Thread.java:748)
> [info]
> [info] Driver stacktrace:
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
> [info]   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> [info]   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> [info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
> [info]   at scala.Option.foreach(Option.scala:407)
> [info]   ...
> [info]   Cause: java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast 
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> [info]   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
> [info]   at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
> [info]   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> [info]   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> [info]   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]   at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> [info]   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]   ...
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to