Ahmad Humayun created SPARK-54196:
-------------------------------------

             Summary: Planner Crashes with Internal Error
                 Key: SPARK-54196
                 URL: https://issues.apache.org/jira/browse/SPARK-54196
             Project: Spark
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 3.5.0
         Environment: Scala 2.13.14
            Reporter: Ahmad Humayun


The following is a minimized, standalone sample of a program that causes the 
planner to crash with an internal error.


{code:scala}

import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object MinimalG13291 {
case class ComplexObject(field1: Int, field2: Int)

def main(args: Array[String]): Unit = {
// Initialize Spark
val spark = SparkSession.builder()
.appName("MinimalG13291")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

// Create minimal test data for tpcds.promotion
val promotionData = Seq(
(1, "Channel1", "Demo1", "TV1"),
(2, "Channel2", "Demo2", "TV2"),
(3, "Channel3", "Demo3", "TV3"),
(4, "Channel4", "Demo4", "TV4"),
(5, "Channel5", "Demo5", "TV5")
)

val promotionDF = promotionData.toDF("p_promo_id", "p_channel_catalog", 
"p_channel_demo", "p_channel_tv")

// Create minimal test data for tpcds.time_dim
val timeDimData = Seq(
(1, "AM", "breakfast", true),
(2, "PM", "lunch", false),
(3, "AM", "dinner", true),
(4, "PM", "snack", false),
(5, "AM", "breakfast", true),
(1, "PM", "lunch", true),
(2, "AM", "dinner", false),
(3, "PM", "snack", true)
)

val timeDimDF = timeDimData.toDF("t_time_sk", "t_am_pm", "t_meal_time", 
"t_shift")

// Register as temporary tables
promotionDF.createOrReplaceTempView("promotion")
timeDimDF.createOrReplaceTempView("time_dim")

// Create database and tables
spark.sql("CREATE DATABASE IF NOT EXISTS tpcds")
promotionDF.write.mode("overwrite").saveAsTable("tpcds.promotion")
timeDimDF.write.mode("overwrite").saveAsTable("tpcds.time_dim")

// Define the UDF
val preloadedUDF = udf((s: Any) =>

{ val r = scala.util.Random.nextInt() ComplexObject(r, r) }

).asNondeterministic()

val promotionTable = spark.table("tpcds.promotion")
val timeDimTable = spark.table("tpcds.time_dim")
val promotionWithOffset = promotionTable.offset(13)
val filteredTimeDim = 
timeDimTable.filter(col("time_dim.t_meal_time").cast("boolean") === lit(-43))
val outerJoinResult = filteredTimeDim.join(promotionWithOffset, 
col("promotion.p_promo_id") === col("time_dim.t_am_pm"), "outer")
val limitedResult = outerJoinResult.limit(61)
val sink = limitedResult.select(col("promotion.p_channel_tv"), 
preloadedUDF(col("promotion.p_channel_demo")))

sink.explain(true)

spark.stop()
}
}
{code}

The following is the error along with the full stack trace:


{code}
Exception in thread "main" org.apache.spark.SparkException: [INTERNAL_ERROR] 
The Spark SQL phase planning failed with an internal error. You hit a bug in 
Spark or the Spark plugins you use. Please, report this bug to the 
corresponding communities or vendors, and provide the full stack trace.
    at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
    at 
org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:536)
    at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:548)
    at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
    at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
    at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
    at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:186)
    at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
    at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
    at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
    at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
    at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:186)
    at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
    at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:305)
    at 
org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:682)
    at 
org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:305)
    at 
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:320)
    at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:274)
    at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
    at org.apache.spark.sql.Dataset.$anonfun$explain$1(Dataset.scala:593)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.Dataset.explain(Dataset.scala:593)
    at org.apache.spark.sql.Dataset.explain(Dataset.scala:606)
    at bugs.MinimalG13291$.main(MinimalG13291.scala:70)
    at bugs.MinimalG13291.main(MinimalG13291.scala)
Caused by: java.lang.AssertionError: assertion failed: No plan for LocalLimit 
(61 + 13)
+- Project [p_channel_demo#54, p_channel_tv#55]
   +- Relation 
spark_catalog.tpcds.promotion[p_promo_id#52,p_channel_catalog#53,p_channel_demo#54,p_channel_tv#55]
 parquet

    at scala.Predef$.assert(Predef.scala:279)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
    at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:646)
    at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:642)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1293)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
    at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
    at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:646)
    at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:642)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1293)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
    at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
    at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:646)
    at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:642)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1293)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
    at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
    at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:646)
    at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:642)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1293)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
    at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
    at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
    at 
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:496)
    at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
    at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
    at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
    at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
    ... 27 more
{code}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to