[
https://issues.apache.org/jira/browse/SPARK-35287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan resolved SPARK-35287.
---------------------------------
Fix Version/s: 3.1.2
3.2.0
Resolution: Fixed
Issue resolved by pull request 32606
[https://github.com/apache/spark/pull/32606]
> RemoveRedundantProjects removes non-redundant projects
> ------------------------------------------------------
>
> Key: SPARK-35287
> URL: https://issues.apache.org/jira/browse/SPARK-35287
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.1.1
> Reporter: Chungmin
> Priority: Major
> Fix For: 3.2.0, 3.1.2
>
>
> RemoveRedundantProjects erroneously removes non-redundant projects which are
> required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There
> is a code for this case, but it only looks at the child. The bug occurs when
> DataSourceV2ScanExec is not a child of the project, but a descendant. The
> method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to
> account for descendants too.
> The original scenario requires Iceberg to reproduce the issue. In theory, it
> should be able to reproduce the bug with Spark SQL only, and someone more
> knowledgeable with Spark SQL should be able to make such a scenario. The
> following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1):
> {code:java}
> import scala.collection.JavaConverters._
> import org.apache.iceberg.{PartitionSpec, TableProperties}
> import org.apache.iceberg.hadoop.HadoopTables
> import org.apache.iceberg.spark.SparkSchemaUtil
> import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
> import org.apache.spark.sql.internal.SQLConf
> class RemoveRedundantProjectsTest extends QueryTest {
> override val spark: SparkSession = SparkSession
> .builder()
> .master("local[4]")
> .config("spark.driver.bindAddress", "127.0.0.1")
> .appName(suiteName)
> .getOrCreate()
> test("RemoveRedundantProjects removes non-redundant projects") {
> withSQLConf(
> SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
> SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
> SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
> withTempDir { dir =>
> val path = dir.getCanonicalPath
> val data = spark.range(3).toDF
> val table = new HadoopTables().create(
> SparkSchemaUtil.convert(data.schema),
> PartitionSpec.unpartitioned(),
> Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
> path)
> data.write.format("iceberg").mode("overwrite").save(path)
> table.refresh()
> val df = spark.read.format("iceberg").load(path)
> val dfX = df.as("x")
> val dfY = df.as("y")
> val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
> join.explain("extended")
> assert(join.count() == 2)
> }
> }
> }
> }
> {code}
> Stack trace:
> {noformat}
> [info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
> [info] org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor
> driver): java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> [info] at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
> [info] at
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
> [info] at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> [info] at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> [info] at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info] at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info] at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info] at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info] at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info] at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> [info] at org.apache.spark.scheduler.Task.run(Task.scala:131)
> [info] at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
> [info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> [info] at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
> [info] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [info] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [info] at java.lang.Thread.run(Thread.java:748)
> [info]
> [info] Driver stacktrace:
> [info] at
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
> [info] at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
> [info] at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
> [info] at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> [info] at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> [info] at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
> [info] at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
> [info] at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
> [info] at scala.Option.foreach(Option.scala:407)
> [info] ...
> [info] Cause: java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> [info] at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
> [info] at
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
> [info] at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> [info] at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> [info] at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info] at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info] ...
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]