[ 
https://issues.apache.org/jira/browse/SPARK-35287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin updated SPARK-35287:
-----------------------------
    Description: 
RemoveRedundantProjects erroneously removes non-redundant projects which are 
required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There 
is a code for this case, but it only looks at the child. The bug occurs when 
DataSourceV2ScanExec is not a child of the project, but a descendant. The 
method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to 
account for descendants too.

The original scenario requires Iceberg to reproduce the issue. In theory, it 
should be able to reproduce the bug with Spark SQL only, and someone more 
knowledgeable with Spark SQL should be able to make such a scenario. The 
following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): 

{code:java}
import scala.collection.JavaConverters._

import org.apache.iceberg.{PartitionSpec, TableProperties}
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
import org.apache.spark.sql.internal.SQLConf

class RemoveRedundantProjectsTest extends QueryTest {
  override val spark: SparkSession = SparkSession
    .builder()
    .master("local[4]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .appName(suiteName)
    .getOrCreate()
  test("RemoveRedundantProjects removes non-redundant projects") {
    withSQLConf(
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
      SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
      SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
      withTempDir { dir =>
        val path = dir.getCanonicalPath
        val data: DataFrame = spark.range(3).toDF
        val table = new HadoopTables().create(
          SparkSchemaUtil.convert(data.schema),
          PartitionSpec.unpartitioned(),
          Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
          path)
        data.write.format("iceberg").mode("overwrite").save(path)
        table.refresh()

        val df = spark.read.format("iceberg").load(path)
        val dfX = df.as("x")
        val dfY = df.as("y")
        val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
        join.explain("extended")
        assert(join.count() == 2)
      }
    }
  }
}
{code}

Stack trace:

{noformat}
[info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 
1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor driver): 
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow
[info]  at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
[info]  at 
org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
[info]  at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
[info]  at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
[info]  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
[info]  at org.apache.spark.scheduler.Task.run(Task.scala:131)
[info]  at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
[info]  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
[info]  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]  at java.lang.Thread.run(Thread.java:748)
[info]
[info] Driver stacktrace:
[info]   at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
[info]   at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
[info]   at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
[info]   at scala.Option.foreach(Option.scala:407)
[info]   ...
[info]   Cause: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow
[info]   at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
[info]   at 
org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
[info]   at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
[info]   at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
[info]   at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]   at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
[info]   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]   ...
{noformat}

  was:
RemoveRedundantProjects erroneously removes non-redundant projects which are 
required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There 
is a code for this case, but it only looks at the child. The bug occurs when 
DataSourceV2ScanExec is not a child of the project, but a descendant. The 
method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to 
account for descendants too.

The original scenario requires Iceberg to reproduce the issue. In theory, it 
should be able to reproduce the bug with Spark SQL only, and someone more 
knowledgeable with Spark SQL should be able to make such a scenario. The 
following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): 
{code:java}
import scala.collection.JavaConverters._

import org.apache.iceberg.{PartitionSpec, TableProperties}
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
import org.apache.spark.sql.internal.SQLConf

class RemoveRedundantProjectsTest extends QueryTest {
  override val spark: SparkSession = SparkSession
    .builder()
    .master("local[4]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .appName(suiteName)
    .getOrCreate()
  test("RemoveRedundantProjects removes non-redundant projects") {
    withSQLConf(
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
      SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
      SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
      withTempDir { dir =>
        val path = dir.getCanonicalPath
        val data: DataFrame = spark.range(3).toDF
        val table = new HadoopTables().create(
          SparkSchemaUtil.convert(data.schema),
          PartitionSpec.unpartitioned(),
          Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
          path)
        data.write.format("iceberg").mode("overwrite").save(path)
        table.refresh()

        val df = spark.read.format("iceberg").load(path)
        val dfX = df.as("x")
        val dfY = df.as("y")
        val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
        join.explain("extended")
        assert(join.count() == 2)
      }
    }
  }
}
{code}
 

 


> RemoveRedundantProjects removes non-redundant projects
> ------------------------------------------------------
>
>                 Key: SPARK-35287
>                 URL: https://issues.apache.org/jira/browse/SPARK-35287
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: Chungmin
>            Priority: Major
>
> RemoveRedundantProjects erroneously removes non-redundant projects which are 
> required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There 
> is a code for this case, but it only looks at the child. The bug occurs when 
> DataSourceV2ScanExec is not a child of the project, but a descendant. The 
> method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to 
> account for descendants too.
> The original scenario requires Iceberg to reproduce the issue. In theory, it 
> should be able to reproduce the bug with Spark SQL only, and someone more 
> knowledgeable with Spark SQL should be able to make such a scenario. The 
> following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): 
> {code:java}
> import scala.collection.JavaConverters._
> import org.apache.iceberg.{PartitionSpec, TableProperties}
> import org.apache.iceberg.hadoop.HadoopTables
> import org.apache.iceberg.spark.SparkSchemaUtil
> import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
> import org.apache.spark.sql.internal.SQLConf
> class RemoveRedundantProjectsTest extends QueryTest {
>   override val spark: SparkSession = SparkSession
>     .builder()
>     .master("local[4]")
>     .config("spark.driver.bindAddress", "127.0.0.1")
>     .appName(suiteName)
>     .getOrCreate()
>   test("RemoveRedundantProjects removes non-redundant projects") {
>     withSQLConf(
>       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
>       SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
>       SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
>       withTempDir { dir =>
>         val path = dir.getCanonicalPath
>         val data: DataFrame = spark.range(3).toDF
>         val table = new HadoopTables().create(
>           SparkSchemaUtil.convert(data.schema),
>           PartitionSpec.unpartitioned(),
>           Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
>           path)
>         data.write.format("iceberg").mode("overwrite").save(path)
>         table.refresh()
>         val df = spark.read.format("iceberg").load(path)
>         val dfX = df.as("x")
>         val dfY = df.as("y")
>         val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
>         join.explain("extended")
>         assert(join.count() == 2)
>       }
>     }
>   }
> }
> {code}
> Stack trace:
> {noformat}
> [info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
> [info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in 
> stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor 
> driver): java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast 
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> [info]  at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
> [info]  at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
> [info]  at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> [info]  at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> [info]  at org.apache.spark.scheduler.Task.run(Task.scala:131)
> [info]  at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
> [info]  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> [info]  at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
> [info]  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [info]  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [info]  at java.lang.Thread.run(Thread.java:748)
> [info]
> [info] Driver stacktrace:
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
> [info]   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> [info]   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> [info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
> [info]   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
> [info]   at scala.Option.foreach(Option.scala:407)
> [info]   ...
> [info]   Cause: java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast 
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> [info]   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
> [info]   at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
> [info]   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> [info]   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> [info]   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> [info]   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]   at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> [info]   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> [info]   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> [info]   ...
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to