This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new c0e36767d chore: Improve plan comet transformation log (#2564)
c0e36767d is described below

commit c0e36767d9580713366a126d990e495346f3f11e
Author: Zhen Wang <[email protected]>
AuthorDate: Mon Oct 13 23:06:44 2025 +0800

    chore: Improve plan comet transformation log (#2564)
    
    * chore: Make COMET_EXPLAIN_TRANSFORMATIONS behavior consistent
    
    * refer to spark rule log
    
    * fix
---
 spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala  | 8 ++++++--
 spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala  | 9 ++++++---
 .../org/apache/comet/rules/EliminateRedundantTransitions.scala   | 9 ++++++---
 3 files changed, 18 insertions(+), 8 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index f572417bd..df5c82d02 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -27,6 +27,7 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
 import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
RangePartitioning, RoundRobinPartitioning, SinglePartition}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.sideBySide
 import org.apache.spark.sql.comet._
 import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, 
CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager}
 import org.apache.spark.sql.execution._
@@ -613,8 +614,11 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
 
   override def apply(plan: SparkPlan): SparkPlan = {
     val newPlan = _apply(plan)
-    if (showTransformations) {
-      logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
+    if (showTransformations && !newPlan.fastEquals(plan)) {
+      logInfo(s"""
+           |=== Applying Rule $ruleName ===
+           |${sideBySide(plan.treeString, newPlan.treeString).mkString("\n")}
+           |""".stripMargin)
     }
     newPlan
   }
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 950d0e9d3..1b38b8518 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
GenericInternalRow, PlanExpression}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData, MetadataColumnHelper}
+import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, 
GenericArrayData, MetadataColumnHelper}
 import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
 import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -60,8 +60,11 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
 
   override def apply(plan: SparkPlan): SparkPlan = {
     val newPlan = _apply(plan)
-    if (showTransformations) {
-      logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
+    if (showTransformations && !newPlan.fastEquals(plan)) {
+      logInfo(s"""
+           |=== Applying Rule $ruleName ===
+           |${sideBySide(plan.treeString, newPlan.treeString).mkString("\n")}
+           |""".stripMargin)
     }
     newPlan
   }
diff --git 
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
 
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
index a1a96d321..7c92b07bc 100644
--- 
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
+++ 
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
@@ -21,6 +21,7 @@ package org.apache.comet.rules
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.sideBySide
 import org.apache.spark.sql.comet.{CometCollectLimitExec, 
CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec}
 import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, 
CometShuffleExchangeExec}
 import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, 
SparkPlan}
@@ -56,9 +57,11 @@ case class EliminateRedundantTransitions(session: 
SparkSession) extends Rule[Spa
 
   override def apply(plan: SparkPlan): SparkPlan = {
     val newPlan = _apply(plan)
-    if (showTransformations) {
-      // scalastyle:off println
-      System.err.println(s"EliminateRedundantTransitions:\nINPUT: 
$plan\nOUTPUT: $newPlan")
+    if (showTransformations && !newPlan.fastEquals(plan)) {
+      logInfo(s"""
+           |=== Applying Rule $ruleName ===
+           |${sideBySide(plan.treeString, newPlan.treeString).mkString("\n")}
+           |""".stripMargin)
     }
     newPlan
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to