Ahmad Humayun created SPARK-54196:
-------------------------------------
Summary: Planner Crashes with Internal Error
Key: SPARK-54196
URL: https://issues.apache.org/jira/browse/SPARK-54196
Project: Spark
Issue Type: Bug
Components: Optimizer
Affects Versions: 3.5.0
Environment: Scala 2.13.14
Reporter: Ahmad Humayun
The following is a minimized, standalone sample of a program that causes the
planner to crash with an internal error.
{code:scala}
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object MinimalG13291 {
case class ComplexObject(field1: Int, field2: Int)
def main(args: Array[String]): Unit = {
// Initialize Spark
val spark = SparkSession.builder()
.appName("MinimalG13291")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
// Create minimal test data for tpcds.promotion
val promotionData = Seq(
(1, "Channel1", "Demo1", "TV1"),
(2, "Channel2", "Demo2", "TV2"),
(3, "Channel3", "Demo3", "TV3"),
(4, "Channel4", "Demo4", "TV4"),
(5, "Channel5", "Demo5", "TV5")
)
val promotionDF = promotionData.toDF("p_promo_id", "p_channel_catalog",
"p_channel_demo", "p_channel_tv")
// Create minimal test data for tpcds.time_dim
val timeDimData = Seq(
(1, "AM", "breakfast", true),
(2, "PM", "lunch", false),
(3, "AM", "dinner", true),
(4, "PM", "snack", false),
(5, "AM", "breakfast", true),
(1, "PM", "lunch", true),
(2, "AM", "dinner", false),
(3, "PM", "snack", true)
)
val timeDimDF = timeDimData.toDF("t_time_sk", "t_am_pm", "t_meal_time",
"t_shift")
// Register as temporary tables
promotionDF.createOrReplaceTempView("promotion")
timeDimDF.createOrReplaceTempView("time_dim")
// Create database and tables
spark.sql("CREATE DATABASE IF NOT EXISTS tpcds")
promotionDF.write.mode("overwrite").saveAsTable("tpcds.promotion")
timeDimDF.write.mode("overwrite").saveAsTable("tpcds.time_dim")
// Define the UDF
val preloadedUDF = udf((s: Any) =>
{ val r = scala.util.Random.nextInt() ComplexObject(r, r) }
).asNondeterministic()
val promotionTable = spark.table("tpcds.promotion")
val timeDimTable = spark.table("tpcds.time_dim")
val promotionWithOffset = promotionTable.offset(13)
val filteredTimeDim =
timeDimTable.filter(col("time_dim.t_meal_time").cast("boolean") === lit(-43))
val outerJoinResult = filteredTimeDim.join(promotionWithOffset,
col("promotion.p_promo_id") === col("time_dim.t_am_pm"), "outer")
val limitedResult = outerJoinResult.limit(61)
val sink = limitedResult.select(col("promotion.p_channel_tv"),
preloadedUDF(col("promotion.p_channel_demo")))
sink.explain(true)
spark.stop()
}
}
{code}
The following is the error along with the full stack trace:
{code}
Exception in thread "main" org.apache.spark.SparkException: [INTERNAL_ERROR]
The Spark SQL phase planning failed with an internal error. You hit a bug in
Spark or the Spark plugins you use. Please, report this bug to the
corresponding communities or vendors, and provide the full stack trace.
at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
at
org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:536)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:548)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:186)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:186)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:305)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:682)
at
org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:305)
at
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:320)
at
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:274)
at
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
at org.apache.spark.sql.Dataset.$anonfun$explain$1(Dataset.scala:593)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset.explain(Dataset.scala:593)
at org.apache.spark.sql.Dataset.explain(Dataset.scala:606)
at bugs.MinimalG13291$.main(MinimalG13291.scala:70)
at bugs.MinimalG13291.main(MinimalG13291.scala)
Caused by: java.lang.AssertionError: assertion failed: No plan for LocalLimit
(61 + 13)
+- Project [p_channel_demo#54, p_channel_tv#55]
+- Relation
spark_catalog.tpcds.promotion[p_promo_id#52,p_channel_catalog#53,p_channel_demo#54,p_channel_tv#55]
parquet
at scala.Predef$.assert(Predef.scala:279)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:646)
at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:642)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1293)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:646)
at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:642)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1293)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:646)
at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:642)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1293)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:646)
at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:642)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1293)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:496)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
... 27 more
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]