[ https://issues.apache.org/jira/browse/SPARK-35287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17338291#comment-17338291 ]
Hyukjin Kwon commented on SPARK-35287: -------------------------------------- cc [~aokolnychyi] FYI > RemoveRedundantProjects removes non-redundant projects > ------------------------------------------------------ > > Key: SPARK-35287 > URL: https://issues.apache.org/jira/browse/SPARK-35287 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.1 > Reporter: Chungmin > Priority: Major > > RemoveRedundantProjects erroneously removes non-redundant projects which are > required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There > is a code for this case, but it only looks at the child. The bug occurs when > DataSourceV2ScanExec is not a child of the project, but a descendant. The > method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to > account for descendants too. > The original scenario requires Iceberg to reproduce the issue. In theory, it > should be able to reproduce the bug with Spark SQL only, and someone more > knowledgeable with Spark SQL should be able to make such a scenario. The > following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): > {code:java} > import scala.collection.JavaConverters._ > import org.apache.iceberg.{PartitionSpec, TableProperties} > import org.apache.iceberg.hadoop.HadoopTables > import org.apache.iceberg.spark.SparkSchemaUtil > import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession} > import org.apache.spark.sql.internal.SQLConf > class RemoveRedundantProjectsTest extends QueryTest { > override val spark: SparkSession = SparkSession > .builder() > .master("local[4]") > .config("spark.driver.bindAddress", "127.0.0.1") > .appName(suiteName) > .getOrCreate() > test("RemoveRedundantProjects removes non-redundant projects") { > withSQLConf( > SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", > SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", > SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") { > withTempDir { dir => > val path = dir.getCanonicalPath > val data = spark.range(3).toDF > val table = new HadoopTables().create( > SparkSchemaUtil.convert(data.schema), > PartitionSpec.unpartitioned(), > Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava, > path) > data.write.format("iceberg").mode("overwrite").save(path) > table.refresh() > val df = spark.read.format("iceberg").load(path) > val dfX = df.as("x") > val dfY = df.as("y") > val join = dfX.filter(dfX("id") > 0).join(dfY, "id") > join.explain("extended") > assert(join.count() == 2) > } > } > } > } > {code} > Stack trace: > {noformat} > [info] - RemoveRedundantProjects removes non-redundant projects *** FAILED *** > [info] org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in > stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor > driver): java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast > to org.apache.spark.sql.catalyst.expressions.UnsafeRow > [info] at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226) > [info] at > org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) > [info] at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) > [info] at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > [info] at org.apache.spark.scheduler.Task.run(Task.scala:131) > [info] at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > [info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) > [info] at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) > [info] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [info] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [info] at java.lang.Thread.run(Thread.java:748) > [info] > [info] Driver stacktrace: > [info] at > org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253) > [info] at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202) > [info] at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201) > [info] at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > [info] at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > [info] at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201) > [info] at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078) > [info] at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078) > [info] at scala.Option.foreach(Option.scala:407) > [info] ... > [info] Cause: java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast > to org.apache.spark.sql.catalyst.expressions.UnsafeRow > [info] at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226) > [info] at > org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) > [info] at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) > [info] at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) > [info] at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > [info] ... > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org