[
https://issues.apache.org/jira/browse/SPARK-52163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Igor Railean updated SPARK-52163:
---------------------------------
Summary: Spark3.4+ LogicalRDD stats estimation may throw
bigInt.reportOverflow (was: Spark3.4 LogicalRDD stats estimation may throw
bigInt.reportOverflow)
> Spark3.4+ LogicalRDD stats estimation may throw bigInt.reportOverflow
> ---------------------------------------------------------------------
>
> Key: SPARK-52163
> URL: https://issues.apache.org/jira/browse/SPARK-52163
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.4.0, 3.5.0, 4.0.0
> Reporter: Igor Railean
> Priority: Major
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> Due to updates in LogicalRDD stats calculation [SPARK-39748[SQL][SS] Include
> the origin logical plan for LogicalRDD if it comes from DataFrame by
> HeartSaVioR · Pull Request #37161 ·
> apache/spark|https://github.com/apache/spark/pull/37161/files#diff-aa0d83f9a15ee1a818569d9cfee343dcd09de7e5e5a539719afcb039aee6aafa]
> LogicalRDD sizeInBytes can explode and cause bigInteger overflow
> {code:java}
> Stack Trace:
> java.base/java.math.BigInteger.reportOverflow(BigInteger.java:1153),
> java.base/java.math.BigInteger.multiply(BigInteger.java:1658),
> java.base/java.math.BigInteger.multiply(BigInteger.java:1566),
> scala.math.BigInt.$times(BigInt.scala:196),
> scala.math.Numeric$BigIntIsIntegral.times(Numeric.scala:37),
> scala.math.Numeric$BigIntIsIntegral.times$(Numeric.scala:37),
> scala.math.Numeric$BigIntIsIntegral$.times(Numeric.scala:47),
> scala.math.Numeric$BigIntIsIntegral$.times(Numeric.scala:47),
> scala.collection.TraversableOnce.$anonfun$product$1(TraversableOnce.scala:264),
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196),
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194),
> scala.collection.Iterator.foreach(Iterator.scala:943),
> scala.collection.Iterator.foreach$(Iterator.scala:943),
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431),
> scala.collection.IterableLike.foreach(IterableLike.scala:74),
> scala.collection.IterableLike.foreach$(IterableLike.scala:73),
> scala.collection.AbstractIterable.foreach(Iterable.scala:56),
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199),
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192),
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108),
> scala.collection.TraversableOnce.product(TraversableOnce.scala:264),
> scala.collection.TraversableOnce.product$(TraversableOnce.scala:264),
> scala.collection.AbstractTraversable.product(Traversable.scala:108),
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.default(SizeInBytesOnlyStatsPlanVisitor.scala:58),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visitJoin(SizeInBytesOnlyStatsPlanVisitor.scala:124),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visitJoin(SizeInBytesOnlyStatsPlanVisitor.scala:28),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor.visit(LogicalPlanVisitor.scala:35),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor.visit$(LogicalPlanVisitor.scala:25),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visit(SizeInBytesOnlyStatsPlanVisitor.scala:28),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.$anonfun$stats$1(LogicalPlanStats.scala:37),
> scala.Option.getOrElse(Option.scala:189),
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.stats(LogicalPlanStats.scala:33),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.stats$(LogicalPlanStats.scala:33),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:31),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visitUnaryNode(SizeInBytesOnlyStatsPlanVisitor.scala:40),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visitProject(SizeInBytesOnlyStatsPlanVisitor.scala:149),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visitProject(SizeInBytesOnlyStatsPlanVisitor.scala:28),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor.visit(LogicalPlanVisitor.scala:38),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor.visit$(LogicalPlanVisitor.scala:25),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visit(SizeInBytesOnlyStatsPlanVisitor.scala:28),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.$anonfun$stats$1(LogicalPlanStats.scala:37),
> scala.Option.getOrElse(Option.scala:189),
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.stats(LogicalPlanStats.scala:33),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.stats$(LogicalPlanStats.scala:33),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:31),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visitUnaryNode(SizeInBytesOnlyStatsPlanVisitor.scala:40),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visitAggregate(SizeInBytesOnlyStatsPlanVisitor.scala:67),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visitAggregate(SizeInBytesOnlyStatsPlanVisitor.scala:28),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor.visit(LogicalPlanVisitor.scala:26),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor.visit$(LogicalPlanVisitor.scala:25),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visit(SizeInBytesOnlyStatsPlanVisitor.scala:28),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.$anonfun$stats$1(LogicalPlanStats.scala:37),
> scala.Option.getOrElse(Option.scala:189),
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.stats(LogicalPlanStats.scala:33),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.stats$(LogicalPlanStats.scala:33),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:31),
>
> org.apache.spark.sql.execution.adaptive.LogicalQueryStage.$anonfun$computeStats$3(LogicalQueryStage.scala:70),
> scala.Option.getOrElse(Option.scala:189),
> org.apache.spark.sql.execution.adaptive.LogicalQueryStage.computeStats(LogicalQueryStage.scala:70),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.default(SizeInBytesOnlyStatsPlanVisitor.scala:56),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.default(SizeInBytesOnlyStatsPlanVisitor.scala:28),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor.visit(LogicalPlanVisitor.scala:49),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor.visit$(LogicalPlanVisitor.scala:25),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visit(SizeInBytesOnlyStatsPlanVisitor.scala:28),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.$anonfun$stats$1(LogicalPlanStats.scala:37),
> scala.Option.getOrElse(Option.scala:189),
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.stats(LogicalPlanStats.scala:33),
>
> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats.stats$(LogicalPlanStats.scala:33),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:31),
>
> org.apache.spark.sql.execution.adaptive.LogicalQueryStage.maxRows(LogicalQueryStage.scala:73),
>
> org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$maxRows$3(basicLogicalOperators.scala:420),
>
> org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$maxRows$3$adapted(basicLogicalOperators.scala:419),
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62),
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55),
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49),
> org.apache.spark.sql.catalyst.plans.logical.Union.maxRows(basicLogicalOperators.scala:419),
>
> org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan$$anonfun$apply$2.applyOrElse(OptimizeOneRowPlan.scala:42),
>
> org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan$$anonfun$apply$2.applyOrElse(OptimizeOneRowPlan.scala:37),
>
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:566),
>
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104),
>
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:566),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:31),
>
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279),
>
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275),
>
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:31),
>
> org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan$.apply(OptimizeOneRowPlan.scala:37),
>
> org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan$.apply(OptimizeOneRowPlan.scala:35),
>
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222),
> scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60),
> scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68),
> scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38),
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219),
>
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211),
> scala.collection.immutable.List.foreach(List.scala:431),
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211),
>
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.reOptimize(AdaptiveSparkPlanExec.scala:664),
>
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:318),
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827),
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242),
>
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387),
>
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:372),
>
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195),
>
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246),
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151),
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243),
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191),
> org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:697),
> org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4206),
> org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526),
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4204),
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118),
>
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195),
>
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103),
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827),
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65),
> org.apache.spark.sql.Dataset.withAction(Dataset.scala:4204),
> org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:696),
> org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:659)eholder
> {code}
> Minimal example:
> {code:java}
> import org.apache.spark.sql.functions.col
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.types._// Define your schema
> val schema = StructType(Seq(
> StructField("id", IntegerType, nullable = true),
> StructField("name", StringType, nullable = true)
> ))// Create empty RDD[Row]
> val emptyRDD = spark.sparkContext.emptyRDD[Row]// Create empty DataFrame
> var emptyDF = spark.createDataFrame(emptyRDD, schema)
> sc.setCheckpointDir("dbfs:/tmp/rdd-checkpoints")try
> {
> for (i <- 1 to 10) {
> emptyDF = emptyDF.select(col("id"))
> emptyDF = emptyDF.join(emptyDF, Seq("id"))
> emptyDF = emptyDF.select(col("id"))
> if (i % 2 == 0) {
> println("Running explain cost")
> emptyDF = emptyDF.checkpoint() // to truncate lineage
> emptyDF.explain("cost")
> }
> }
> }
> catch {
> case e: Exception =>
> println(s"Exception caught during execution: ${e.getMessage}")
> e.printStackTrace()
> }
> {code}
> You can quickly see stats exploding:
> == Optimized Logical Plan ==
> LogicalRDD [id#104207|#104207], false, Statistics(sizeInBytes=7.12E+17879 B,
> ColumnStat: N/A)
> In ~30 iterations number will be so big that digits won't fit into
> Integer.MAX_VALUE sized array and job will fail with exception provided in
> the beginning
> In Spark 3.3.2 that would yield on each iteration:
> == Optimized Logical Plan ==
> LogicalRDD [id#2|#2], false, Statistics(sizeInBytes=8.0 EiB)
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]