This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d01f6a  feat: Add extended explain info to Comet plan (#255)
6d01f6a is described below

commit 6d01f6aca690fd98c9fdea1058cbbe11b2c868ff
Author: Parth Chandra <[email protected]>
AuthorDate: Mon Apr 22 12:51:40 2024 -0700

    feat: Add extended explain info to Comet plan (#255)
    
    * feat: Add extended explain info to Comet plan
    
    Requires Spark 4.0.0 for the explain info to be visible in Spark UI.
    (see: https://issues.apache.org/jira/browse/SPARK-47289)
    
    * spotless apply
    
    * Address review comments
    
    * fix ci
    
    * Add one more explanation
    
    * address review comments
    
    * fix formatting after rebase
---
 dev/ensure-jars-have-correct-contents.sh           |   2 +
 pom.xml                                            |   3 +
 .../inspections/CometTPCDSQueriesList-results.txt  | 838 +++++++++++++++++++++
 spark/inspections/CometTPCHQueriesList-results.txt | 142 ++++
 .../apache/comet/CometSparkSessionExtensions.scala | 223 +++++-
 .../org/apache/comet/ExtendedExplainInfo.scala     |  88 +++
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 571 ++++++++++----
 .../shims/ShimCometSparkSessionExtensions.scala    |  17 +-
 .../spark/sql/ExtendedExplainGenerator.scala       |  34 +
 .../org/apache/comet/CometExpressionSuite.scala    |  51 ++
 .../org/apache/spark/sql/CometTPCQueryBase.scala   |   3 +
 .../apache/spark/sql/CometTPCQueryListBase.scala   |  18 +-
 .../scala/org/apache/spark/sql/CometTestBase.scala |  32 +-
 13 files changed, 1860 insertions(+), 162 deletions(-)

diff --git a/dev/ensure-jars-have-correct-contents.sh 
b/dev/ensure-jars-have-correct-contents.sh
index 5543093..1f97d2d 100755
--- a/dev/ensure-jars-have-correct-contents.sh
+++ b/dev/ensure-jars-have-correct-contents.sh
@@ -78,6 +78,8 @@ 
allowed_expr+="|^org/apache/spark/shuffle/sort/CometShuffleExternalSorter.*$"
 allowed_expr+="|^org/apache/spark/shuffle/sort/RowPartition.class$"
 allowed_expr+="|^org/apache/spark/shuffle/comet/.*$"
 allowed_expr+="|^org/apache/spark/sql/$"
+# allow ExplainPlanGenerator trait since it may not be available in older 
Spark versions
+allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$"
 allowed_expr+="|^org/apache/spark/CometPlugin.class$"
 allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$"
 allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$"
diff --git a/pom.xml b/pom.xml
index 1e1bf7b..dd0ccc6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -932,6 +932,9 @@ under the License.
                         
<ignoreClass>javax.annotation.meta.TypeQualifierValidator</ignoreClass>
                         <!-- this class is not properly excluded from 
comet-spark right now -->
                         
<ignoreClass>org.apache.parquet.filter2.predicate.SparkFilterApi</ignoreClass>
+                        <!-- we explicitly include a duplicate to allow older 
versions of Spark -->
+                        <!-- this can be removed once we no longer support 
spark 3.x  -->
+                        
<ignoreClass>org.apache.spark.sql.ExtendedExplainGenerator</ignoreClass>
                       </ignoreClasses>
                     </dependency>
                     <dependency>
diff --git a/spark/inspections/CometTPCDSQueriesList-results.txt 
b/spark/inspections/CometTPCDSQueriesList-results.txt
new file mode 100644
index 0000000..13f99a1
--- /dev/null
+++ b/spark/inspections/CometTPCDSQueriesList-results.txt
@@ -0,0 +1,838 @@
+Query: q1. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q1: ExplainInfo:
+ObjectHashAggregate is not supported
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q2. Comet Exec: Enabled (CometFilter, CometProject, CometUnion)
+Query: q2: ExplainInfo:
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q3. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q3: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q4. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q4: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q5. Comet Exec: Enabled (CometFilter, CometProject, CometUnion)
+Query: q5: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q6. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject)
+Query: q6: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q8. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q8: ExplainInfo:
+ObjectHashAggregate is not supported
+getstructfield is not supported
+xxhash64 is not supported
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q9. Comet Exec: Enabled (CometFilter)
+Query: q9: ExplainInfo:
+named_struct is not supported
+getstructfield is not supported
+
+Query: q10. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q10: ExplainInfo:
+ObjectHashAggregate is not supported
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q11. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q11: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q12. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q12: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+
+Query: q13. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q13: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q14a. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q14a: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+Union disabled because not all child plans are native
+
+Query: q14b. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q14b: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+Union disabled because not all child plans are native
+
+Query: q15. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q15: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q16. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q16: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q17. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q17: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q18. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q18: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q19. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q19: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q20. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q20: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+
+Query: q21. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q21: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q22. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q22: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q23a. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q23a: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q23b. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q23b: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q24a. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q24a: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q24b. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q24b: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q25. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q25: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q26. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q26: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q27. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q27: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q28. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject)
+Query: q28: ExplainInfo:
+Unsupported aggregation mode PartialMerge
+BroadcastExchange is not supported
+
+Query: q29. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q29: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q30. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q30: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q31. Comet Exec: Enabled (CometFilter)
+Query: q31: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q32. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q32: ExplainInfo:
+ObjectHashAggregate is not supported
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q33. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q33: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q34. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q34: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q35. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q35: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q36. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q36: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+
+Query: q37. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q37: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q38. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q38: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q39a. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q39a: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q39b. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q39b: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q40. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q40: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q41. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q41: ExplainInfo:
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q42. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q42: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q43. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q43: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q44. Comet Exec: Enabled (CometHashAggregate, CometFilter, 
CometProject, CometSort)
+Query: q44: ExplainInfo:
+Window is not supported
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q45. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q45: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q46. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q46: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q47. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q47: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q48. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q48: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q49. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q49: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+Union disabled because not all child plans are native
+
+Query: q50. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q50: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q51. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q51: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q52. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q52: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q53. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q53: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+
+Query: q54. Comet Exec: Enabled (CometFilter, CometProject, CometUnion)
+Query: q54: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q55. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q55: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q56. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q56: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q57. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q57: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q58. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q58: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q59. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q59: ExplainInfo:
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q60. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q60: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q61. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q61: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q62. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q62: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q63. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q63: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+
+Query: q64. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q64: ExplainInfo:
+BroadcastExchange is not supported
+ObjectHashAggregate is not supported
+BroadcastHashJoin disabled because not all child plans are native
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q65. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q65: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q66. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q66: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q67. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q67: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+
+Query: q68. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q68: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q69. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q69: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q70. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q70: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q71. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q71: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q72. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q72: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q73. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q73: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q74. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q74: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q75. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q75: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q76. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q76: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q77. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q77: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q78. Comet Exec: Enabled (CometFilter)
+Query: q78: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q79. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q79: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q80. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q80: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q81. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q81: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q82. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q82: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q83. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q83: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q84. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q84: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q85. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q85: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q86. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q86: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+
+Query: q87. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q87: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q88. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q88: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q89. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q89: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+
+Query: q90. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q90: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q91. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q91: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q92. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q92: ExplainInfo:
+ObjectHashAggregate is not supported
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q93. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q93: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q94. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q94: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q95. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q95: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q96. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q96: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q97. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q97: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q98. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q98: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q99. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q99: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q5a-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometUnion)
+Query: q5a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q6-v2.7. Comet Exec: Enabled (CometHashAggregate, CometFilter, 
CometProject)
+Query: q6-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q10a-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q10a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q11-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q11-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q12-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q12-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+
+Query: q14-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q14-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+Union disabled because not all child plans are native
+
+Query: q14a-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q14a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+xxhash64 is not supported
+Union disabled because not all child plans are native
+
+Query: q18a-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q18a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q20-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q20-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+
+Query: q22-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q22-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Native Broadcast is not enabled
+
+Query: q22a-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q22a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q24-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q24-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q27a-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q27a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q34-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q34-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q35-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q35-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q35a-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q35a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q36a-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q36a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+Window is not supported
+
+Query: q47-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q47-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q49-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q49-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+Union disabled because not all child plans are native
+
+Query: q51a-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q51a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q57-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q57-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+SortMergeJoin disabled because not all child plans are native
+
+Query: q64-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q64-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+ObjectHashAggregate is not supported
+BroadcastHashJoin disabled because not all child plans are native
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q67a-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q67a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+Window is not supported
+
+Query: q70a-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort)
+Query: q70a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+SortMergeJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q72-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q72-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q74-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q74-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q75-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q75-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q77a-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q77a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q78-v2.7. Comet Exec: Enabled (CometFilter)
+Query: q78-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q80a-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q80a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+
+Query: q86a-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q86a-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Union disabled because not all child plans are native
+Window is not supported
+
+Query: q98-v2.7. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q98-v2.7: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Window is not supported
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
diff --git a/spark/inspections/CometTPCHQueriesList-results.txt 
b/spark/inspections/CometTPCHQueriesList-results.txt
new file mode 100644
index 0000000..b51286d
--- /dev/null
+++ b/spark/inspections/CometTPCHQueriesList-results.txt
@@ -0,0 +1,142 @@
+Query: q1 TPCH Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, 
CometProject)
+Query: q1 TPCH Snappy: ExplainInfo:
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q2 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q2 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+SortMergeJoin disabled because not all child plans are native
+xxhash64 is not supported
+
+Query: q3 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject, 
CometSort)
+Query: q3 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q4 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q4 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q5 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject, 
CometSort)
+Query: q5 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+SortMergeJoin disabled because not all child plans are native
+xxhash64 is not supported
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q6 TPCH Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, 
CometProject)
+Query: q6 TPCH Snappy: ExplainInfo:
+
+
+Query: q7 TPCH Snappy. Comet Exec: Enabled (CometFilter)
+Query: q7 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q8 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q8 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q9 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject, 
CometSort)
+Query: q9 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q10 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject, 
CometSort)
+Query: q10 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q11 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q11 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q12 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q12 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q13 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q13 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q14 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q14 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q15 TPCH Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, 
CometProject)
+Query: q15 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q16 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q16 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q17 TPCH Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, 
CometProject, CometSort)
+Query: q17 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q18 TPCH Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, 
CometProject)
+Query: q18 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+SortMergeJoin disabled because not all child plans are native
+
+Query: q19 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q19 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q20 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject)
+Query: q20 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+ObjectHashAggregate is not supported
+SortMergeJoin disabled because not all child plans are native
+xxhash64 is not supported
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q21 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject, 
CometSort)
+Query: q21 TPCH Snappy: ExplainInfo:
+ObjectHashAggregate is not supported
+Sort merge join with a join condition is not supported
+xxhash64 is not supported
+SortMergeJoin disabled because not all child plans are native
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+
+Query: q22 TPCH Snappy. Comet Exec: Enabled (CometFilter)
+Query: q22 TPCH Snappy: ExplainInfo:
+BroadcastExchange is not supported
+BroadcastHashJoin disabled because not all child plans are native
+Shuffle: unsupported Spark partitioning: 
org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
+
+Query: q1 TPCH Extended Snappy. Comet Exec: Enabled (CometHashAggregate, 
CometFilter, CometProject)
+Query: q1 TPCH Extended Snappy: ExplainInfo:
+
+
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 12ec3b9..fc0d133 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -24,10 +24,11 @@ import java.nio.ByteOrder
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.ByteUnit
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.comet._
 import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, 
CometNativeShuffle}
 import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
@@ -45,7 +46,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 import org.apache.comet.CometConf._
-import org.apache.comet.CometSparkSessionExtensions.{isANSIEnabled, 
isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, 
isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, 
isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar}
+import org.apache.comet.CometSparkSessionExtensions.{createMessage, 
isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, 
isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, 
isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, 
shouldApplyRowToColumnar, withInfo}
 import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
 import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.QueryPlanSerde
@@ -75,8 +76,14 @@ class CometSparkSessionExtensions
 
   case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
     override def apply(plan: SparkPlan): SparkPlan = {
-      if (!isCometEnabled(conf) || !isCometScanEnabled(conf)) plan
-      else {
+      if (!isCometEnabled(conf) || !isCometScanEnabled(conf)) {
+        if (!isCometEnabled(conf)) {
+          withInfo(plan, "Comet is not enabled")
+        } else if (!isCometScanEnabled(conf)) {
+          withInfo(plan, "Comet Scan is not enabled")
+        }
+        plan
+      } else {
         plan.transform {
           // data source V2
           case scanExec: BatchScanExec
@@ -91,6 +98,27 @@ class CometSparkSessionExtensions
               scanExec.copy(scan = cometScan),
               runtimeFilters = scanExec.runtimeFilters)
 
+          // unsupported parquet data source V2
+          case scanExec: BatchScanExec if 
scanExec.scan.isInstanceOf[ParquetScan] =>
+            val requiredSchema = 
scanExec.scan.asInstanceOf[ParquetScan].readDataSchema
+            val info1 = createMessage(
+              !isSchemaSupported(requiredSchema),
+              s"Schema $requiredSchema is not supported")
+            val readPartitionSchema = 
scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema
+            val info2 = createMessage(
+              !isSchemaSupported(readPartitionSchema),
+              s"Schema $readPartitionSchema is not supported")
+            // Comet does not support pushedAggregate
+            val info3 = createMessage(
+              
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined,
+              "Comet does not support pushed aggregate")
+            withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString("\n"))
+            scanExec
+
+          case scanExec: BatchScanExec if 
!scanExec.scan.isInstanceOf[ParquetScan] =>
+            withInfo(scanExec, "Comet Scan only supports Parquet")
+            scanExec
+
           // iceberg scan
           case scanExec: BatchScanExec =>
             if (isSchemaSupported(scanExec.scan.readSchema())) {
@@ -103,22 +131,24 @@ class CometSparkSessionExtensions
                     scanExec.clone().asInstanceOf[BatchScanExec],
                     runtimeFilters = scanExec.runtimeFilters)
                 case _ =>
-                  logInfo(
-                    "Comet extension is not enabled for " +
-                      s"${scanExec.scan.getClass.getSimpleName}: not enabled 
on data source side")
+                  val msg = "Comet extension is not enabled for " +
+                    s"${scanExec.scan.getClass.getSimpleName}: not enabled on 
data source side"
+                  logInfo(msg)
+                  withInfo(scanExec, msg)
                   scanExec
               }
             } else {
-              logInfo(
-                "Comet extension is not enabled for " +
-                  s"${scanExec.scan.getClass.getSimpleName}: Schema not 
supported")
+              val msg = "Comet extension is not enabled for " +
+                s"${scanExec.scan.getClass.getSimpleName}: Schema not 
supported"
+              logInfo(msg)
+              withInfo(scanExec, msg)
               scanExec
             }
 
           // data source V1
           case scanExec @ FileSourceScanExec(
                 HadoopFsRelation(_, partitionSchema, _, _, _: 
ParquetFileFormat, _),
-                _: Seq[_],
+                _: Seq[AttributeReference],
                 requiredSchema,
                 _,
                 _,
@@ -128,6 +158,26 @@ class CometSparkSessionExtensions
                 _) if isSchemaSupported(requiredSchema) && 
isSchemaSupported(partitionSchema) =>
             logInfo("Comet extension enabled for v1 Scan")
             CometScanExec(scanExec, session)
+
+          // data source v1 not supported case
+          case scanExec @ FileSourceScanExec(
+                HadoopFsRelation(_, partitionSchema, _, _, _: 
ParquetFileFormat, _),
+                _: Seq[AttributeReference],
+                requiredSchema,
+                _,
+                _,
+                _,
+                _,
+                _,
+                _) =>
+            val info1 = createMessage(
+              !isSchemaSupported(requiredSchema),
+              s"Schema $requiredSchema is not supported")
+            val info2 = createMessage(
+              !isSchemaSupported(partitionSchema),
+              s"Partition schema $partitionSchema is not supported")
+            withInfo(scanExec, Seq(info1, info2).flatten.mkString(","))
+            scanExec
         }
       }
     }
@@ -138,7 +188,7 @@ class CometSparkSessionExtensions
       plan.transformUp {
         case s: ShuffleExchangeExec
             if isCometPlan(s.child) && !isCometColumnarShuffleEnabled(conf) &&
-              QueryPlanSerde.supportPartitioning(s.child.output, 
s.outputPartitioning) =>
+              QueryPlanSerde.supportPartitioning(s.child.output, 
s.outputPartitioning)._1 =>
           logInfo("Comet extension enabled for Native Shuffle")
 
           // Switch to use Decimal128 regardless of precision, since Arrow 
native execution
@@ -151,7 +201,7 @@ class CometSparkSessionExtensions
         case s: ShuffleExchangeExec
             if (!s.child.supportsColumnar || isCometPlan(
               s.child)) && isCometColumnarShuffleEnabled(conf) &&
-              QueryPlanSerde.supportPartitioningTypes(s.child.output) &&
+              QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 &&
               !isShuffleOperator(s.child) =>
           logInfo("Comet extension enabled for JVM Columnar Shuffle")
           CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
@@ -380,6 +430,14 @@ class CometSparkSessionExtensions
               op
           }
 
+        case op: ShuffledHashJoinExec if !isCometOperatorEnabled(conf, 
"hash_join") =>
+          withInfo(op, "ShuffleHashJoin is not enabled")
+          op
+
+        case op: ShuffledHashJoinExec if !op.children.forall(isCometNative(_)) 
=>
+          withInfo(op, "ShuffleHashJoin disabled because not all child plans 
are native")
+          op
+
         case op: BroadcastHashJoinExec
             if isCometOperatorEnabled(conf, "broadcast_hash_join") &&
               op.children.forall(isCometNative(_)) =>
@@ -401,6 +459,10 @@ class CometSparkSessionExtensions
               op
           }
 
+        case op: BroadcastHashJoinExec if !isCometOperatorEnabled(conf, 
"broadcast_hash_join") =>
+          withInfo(op, "BroadcastHashJoin is not enabled")
+          op
+
         case op: SortMergeJoinExec
             if isCometOperatorEnabled(conf, "sort_merge_join") &&
               op.children.forall(isCometNative(_)) =>
@@ -420,6 +482,13 @@ class CometSparkSessionExtensions
             case None =>
               op
           }
+        case op: SortMergeJoinExec if !isCometOperatorEnabled(conf, 
"sort_merge_join") =>
+          withInfo(op, "SortMergeJoin is not enabled")
+          op
+
+        case op: SortMergeJoinExec if !op.children.forall(isCometNative(_)) =>
+          withInfo(op, "SortMergeJoin disabled because not all child plans are 
native")
+          op
 
         case c @ CoalesceExec(numPartitions, child)
             if isCometOperatorEnabled(conf, "coalesce")
@@ -432,6 +501,14 @@ class CometSparkSessionExtensions
               c
           }
 
+        case c @ CoalesceExec(_, _) if !isCometOperatorEnabled(conf, 
"coalesce") =>
+          withInfo(c, "Coalesce is not enabled")
+          c
+
+        case op: CoalesceExec if !op.children.forall(isCometNative(_)) =>
+          withInfo(op, "Coalesce disabled because not all child plans are 
native")
+          op
+
         case s: TakeOrderedAndProjectExec
             if isCometNative(s.child) && isCometOperatorEnabled(conf, 
"takeOrderedAndProjectExec")
               && isCometShuffleEnabled(conf) &&
@@ -445,6 +522,16 @@ class CometSparkSessionExtensions
               s
           }
 
+        case s: TakeOrderedAndProjectExec =>
+          val info1 = createMessage(
+            !isCometOperatorEnabled(conf, "takeOrderedAndProjectExec"),
+            "TakeOrderedAndProject is not enabled")
+          val info2 = createMessage(
+            !isCometShuffleEnabled(conf),
+            "TakeOrderedAndProject requires shuffle to be enabled")
+          withInfo(s, Seq(info1, info2).flatten.mkString(","))
+          s
+
         case u: UnionExec
             if isCometOperatorEnabled(conf, "union") &&
               u.children.forall(isCometNative) =>
@@ -456,6 +543,14 @@ class CometSparkSessionExtensions
               u
           }
 
+        case u: UnionExec if !isCometOperatorEnabled(conf, "union") =>
+          withInfo(u, "Union is not enabled")
+          u
+
+        case op: UnionExec if !op.children.forall(isCometNative(_)) =>
+          withInfo(op, "Union disabled because not all child plans are native")
+          op
+
         // For AQE broadcast stage on a Comet broadcast exchange
         case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) 
=>
           val newOp = transform1(s)
@@ -488,12 +583,24 @@ class CometSparkSessionExtensions
             if (isCometNative(newPlan) || isCometBroadCastForceEnabled(conf)) {
               newPlan
             } else {
+              if (!isCometOperatorEnabled(
+                  conf,
+                  "broadcastExchangeExec") && 
!isCometBroadCastForceEnabled(conf)) {
+                withInfo(plan, "Native Broadcast is not enabled")
+              }
               plan
             }
           } else {
             plan
           }
 
+        // this case should be checked only after the previous case checking 
for a
+        // child BroadcastExchange has been applied, otherwise that transform
+        // never gets applied
+        case op: BroadcastHashJoinExec if 
!op.children.forall(isCometNative(_)) =>
+          withInfo(op, "BroadcastHashJoin disabled because not all child plans 
are native")
+          op
+
         // For AQE shuffle stage on a Comet shuffle exchange
         case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) =>
           val newOp = transform1(s)
@@ -523,7 +630,7 @@ class CometSparkSessionExtensions
         case s: ShuffleExchangeExec
             if isCometShuffleEnabled(conf) &&
               !isCometColumnarShuffleEnabled(conf) &&
-              QueryPlanSerde.supportPartitioning(s.child.output, 
s.outputPartitioning) =>
+              QueryPlanSerde.supportPartitioning(s.child.output, 
s.outputPartitioning)._1 =>
           logInfo("Comet extension enabled for Native Shuffle")
 
           val newOp = transform1(s)
@@ -544,7 +651,7 @@ class CometSparkSessionExtensions
         // convert it to CometColumnarShuffle,
         case s: ShuffleExchangeExec
             if isCometShuffleEnabled(conf) && 
isCometColumnarShuffleEnabled(conf) &&
-              QueryPlanSerde.supportPartitioningTypes(s.child.output) &&
+              QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 &&
               !isShuffleOperator(s.child) =>
           logInfo("Comet extension enabled for JVM Columnar Shuffle")
 
@@ -562,9 +669,32 @@ class CometSparkSessionExtensions
               s
           }
 
+        case s: ShuffleExchangeExec =>
+          val isShuffleEnabled = isCometShuffleEnabled(conf)
+          val msg1 = createMessage(!isShuffleEnabled, "Native shuffle is not 
enabled")
+          val columnarShuffleEnabled = isCometColumnarShuffleEnabled(conf)
+          val msg2 = createMessage(
+            isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde
+              .supportPartitioning(s.child.output, s.outputPartitioning)
+              ._1,
+            "Shuffle: " +
+              s"${QueryPlanSerde.supportPartitioning(s.child.output, 
s.outputPartitioning)._2}")
+          val msg3 = createMessage(
+            isShuffleEnabled && columnarShuffleEnabled && !QueryPlanSerde
+              .supportPartitioningTypes(s.child.output)
+              ._1,
+            s"Columnar shuffle: 
${QueryPlanSerde.supportPartitioningTypes(s.child.output)._2}")
+          withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(","))
+          s
+
         case op =>
           // An operator that is not supported by Comet
-          op
+          op match {
+            case _: CometExec | _: CometBroadcastExchangeExec | _: 
CometShuffleExchangeExec => op
+            case o =>
+              withInfo(o, s"${o.nodeName} is not supported")
+              o
+          }
       }
     }
 
@@ -853,4 +983,63 @@ object CometSparkSessionExtensions extends Logging {
       ByteUnit.MiB.toBytes(shuffleMemorySize)
     }
   }
+
+  /**
+   * Attaches explain information to a TreeNode, rolling up the corresponding 
information tags
+   * from any child nodes
+   *
+   * @param node
+   *   The node to attach the explain information to. Typically a SparkPlan
+   * @param info
+   *   Information text. Optional, may be null or empty. If not provided, then 
only information
+   *   from child nodes will be included.
+   * @param exprs
+   *   Child nodes. Information attached in these nodes will be be included in 
the information
+   *   attached to @node
+   * @tparam T
+   *   The type of the TreeNode. Typically SparkPlan, AggregateExpression, or 
Expression
+   * @return
+   *   The node with information (if any) attached
+   */
+  def withInfo[T <: TreeNode[_]](node: T, info: String, exprs: T*): T = {
+    val exprInfo = exprs
+      .flatMap { e => Seq(e.getTagValue(CometExplainInfo.EXTENSION_INFO)) }
+      .flatten
+      .mkString("\n")
+    if (info != null && info.nonEmpty && exprInfo.nonEmpty) {
+      node.setTagValue(CometExplainInfo.EXTENSION_INFO, Seq(exprInfo, 
info).mkString("\n"))
+    } else if (exprInfo.nonEmpty) {
+      node.setTagValue(CometExplainInfo.EXTENSION_INFO, exprInfo)
+    } else if (info != null && info.nonEmpty) {
+      node.setTagValue(CometExplainInfo.EXTENSION_INFO, info)
+    }
+    node
+  }
+
+  /**
+   * Attaches explain information to a TreeNode, rolling up the corresponding 
information tags
+   * from any child nodes
+   *
+   * @param node
+   *   The node to attach the explain information to. Typically a SparkPlan
+   * @param exprs
+   *   Child nodes. Information attached in these nodes will be be included in 
the information
+   *   attached to @node
+   * @tparam T
+   *   The type of the TreeNode. Typically SparkPlan, AggregateExpression, or 
Expression
+   * @return
+   *   The node with information (if any) attached
+   */
+  def withInfo[T <: TreeNode[_]](node: T, exprs: T*): T = {
+    withInfo(node, "", exprs: _*)
+  }
+
+  // Helper to reduce boilerplate
+  def createMessage(condition: Boolean, message: => String): Option[String] = {
+    if (condition) {
+      Some(message)
+    } else {
+      None
+    }
+  }
 }
diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala 
b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
new file mode 100644
index 0000000..8d27501
--- /dev/null
+++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.ExtendedExplainGenerator
+import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag}
+import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec}
+
+class ExtendedExplainInfo extends ExtendedExplainGenerator {
+
+  override def title: String = "Comet"
+
+  override def generateExtendedInfo(plan: SparkPlan): String = {
+    val info = extensionInfo(plan)
+    info.distinct.mkString("\n").trim
+  }
+
+  private def getActualPlan(node: TreeNode[_]): TreeNode[_] = {
+    node match {
+      case p: AdaptiveSparkPlanExec => getActualPlan(p.executedPlan)
+      case p: InputAdapter => getActualPlan(p.child)
+      case p: QueryStageExec => getActualPlan(p.plan)
+      case p: WholeStageCodegenExec => getActualPlan(p.child)
+      case p => p
+    }
+  }
+
+  private def extensionInfo(node: TreeNode[_]): mutable.Seq[String] = {
+    var info = mutable.Seq[String]()
+    val sorted = sortup(node)
+    sorted.foreach { p =>
+      val all: Array[String] =
+        
getActualPlan(p).getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse("").split("\n")
+      for (s <- all) {
+        info = info :+ s
+      }
+    }
+    info.filter(!_.contentEquals("\n"))
+  }
+
+  // get all plan nodes, breadth first traversal, then returned the reversed 
list so
+  // leaf nodes are first
+  private def sortup(node: TreeNode[_]): mutable.Queue[TreeNode[_]] = {
+    val ordered = new mutable.Queue[TreeNode[_]]()
+    val traversed = mutable.Queue[TreeNode[_]](getActualPlan(node))
+    while (traversed.nonEmpty) {
+      val s = traversed.dequeue()
+      ordered += s
+      if (s.innerChildren.nonEmpty) {
+        s.innerChildren.foreach {
+          case c @ (_: TreeNode[_]) => traversed.enqueue(getActualPlan(c))
+          case _ =>
+        }
+      }
+      if (s.children.nonEmpty) {
+        s.children.foreach {
+          case c @ (_: TreeNode[_]) => traversed.enqueue(getActualPlan(c))
+          case _ =>
+        }
+      }
+    }
+    ordered.reverse
+  }
+}
+
+object CometExplainInfo {
+  val EXTENSION_INFO = new TreeNodeTag[String]("CometExtensionInfo")
+}
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index cb767d5..0bc2c1d 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
-import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, 
isCometScan, isSpark32, isSpark34Plus}
+import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, 
isCometScan, isSpark32, isSpark34Plus, withInfo}
 import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => 
ProtoDataType, Expr, ScalarFunc}
 import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, 
DecimalInfo, ListInfo, MapInfo, StructInfo}
 import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => 
CometAggregateMode, JoinType, Operator}
@@ -52,7 +52,7 @@ import org.apache.comet.shims.ShimQueryPlanSerde
  */
 object QueryPlanSerde extends Logging with ShimQueryPlanSerde {
   def emitWarning(reason: String): Unit = {
-    logWarning(s"Comet native execution: $reason")
+    logWarning(s"Comet native execution is disabled due to: $reason")
   }
 
   def supportedDataType(dt: DataType): Boolean = dt match {
@@ -218,6 +218,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               .setSum(sumBuilder)
               .build())
         } else {
+          if (dataType.isEmpty) {
+            withInfo(aggExpr, s"datatype ${s.dataType} is not supported", 
child)
+          } else {
+            withInfo(aggExpr, child)
+          }
           None
         }
       case s @ Average(child, _) if avgDataTypeSupported(s.dataType) && 
isLegacyMode(s) =>
@@ -249,7 +254,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               .newBuilder()
               .setAvg(builder)
               .build())
+        } else if (dataType.isEmpty) {
+          withInfo(aggExpr, s"datatype ${s.dataType} is not supported", child)
+          None
         } else {
+          withInfo(aggExpr, child)
           None
         }
       case Count(children) =>
@@ -265,6 +274,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               .setCount(countBuilder)
               .build())
         } else {
+          withInfo(aggExpr, children: _*)
           None
         }
       case min @ Min(child) if minMaxDataTypeSupported(min.dataType) =>
@@ -281,7 +291,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               .newBuilder()
               .setMin(minBuilder)
               .build())
+        } else if (dataType.isEmpty) {
+          withInfo(aggExpr, s"datatype ${min.dataType} is not supported", 
child)
+          None
         } else {
+          withInfo(aggExpr, child)
           None
         }
       case max @ Max(child) if minMaxDataTypeSupported(max.dataType) =>
@@ -298,7 +312,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               .newBuilder()
               .setMax(maxBuilder)
               .build())
+        } else if (dataType.isEmpty) {
+          withInfo(aggExpr, s"datatype ${max.dataType} is not supported", 
child)
+          None
         } else {
+          withInfo(aggExpr, child)
           None
         }
       case first @ First(child, ignoreNulls)
@@ -316,7 +334,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               .newBuilder()
               .setFirst(firstBuilder)
               .build())
+        } else if (dataType.isEmpty) {
+          withInfo(aggExpr, s"datatype ${first.dataType} is not supported", 
child)
+          None
         } else {
+          withInfo(aggExpr, child)
           None
         }
       case last @ Last(child, ignoreNulls)
@@ -334,7 +356,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               .newBuilder()
               .setLast(lastBuilder)
               .build())
+        } else if (dataType.isEmpty) {
+          withInfo(aggExpr, s"datatype ${last.dataType} is not supported", 
child)
+          None
         } else {
+          withInfo(aggExpr, child)
           None
         }
       case bitAnd @ BitAndAgg(child) if 
bitwiseAggTypeSupported(bitAnd.dataType) =>
@@ -351,7 +377,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               .newBuilder()
               .setBitAndAgg(bitAndBuilder)
               .build())
+        } else if (dataType.isEmpty) {
+          withInfo(aggExpr, s"datatype ${bitAnd.dataType} is not supported", 
child)
+          None
         } else {
+          withInfo(aggExpr, child)
           None
         }
       case bitOr @ BitOrAgg(child) if bitwiseAggTypeSupported(bitOr.dataType) 
=>
@@ -368,7 +398,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               .newBuilder()
               .setBitOrAgg(bitOrBuilder)
               .build())
+        } else if (dataType.isEmpty) {
+          withInfo(aggExpr, s"datatype ${bitOr.dataType} is not supported", 
child)
+          None
         } else {
+          withInfo(aggExpr, child)
           None
         }
       case bitXor @ BitXorAgg(child) if 
bitwiseAggTypeSupported(bitXor.dataType) =>
@@ -385,7 +419,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               .newBuilder()
               .setBitXorAgg(bitXorBuilder)
               .build())
+        } else if (dataType.isEmpty) {
+          withInfo(aggExpr, s"datatype ${bitXor.dataType} is not supported", 
child)
+          None
         } else {
+          withInfo(aggExpr, child)
           None
         }
       case cov @ CovSample(child1, child2, _) =>
@@ -427,7 +465,9 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           None
         }
       case fn =>
-        emitWarning(s"unsupported Spark aggregate function: $fn")
+        val msg = s"unsupported Spark aggregate function: ${fn.prettyName}"
+        emitWarning(msg)
+        withInfo(aggExpr, msg, fn.children: _*)
         None
     }
   }
@@ -470,6 +510,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             .setCast(castBuilder)
             .build())
       } else {
+        if (!dataType.isDefined) {
+          withInfo(expr, s"Unsupported datatype ${dt}")
+        } else {
+          withInfo(expr, s"Unsupported expression $childExpr")
+        }
         None
       }
     }
@@ -478,7 +523,11 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
       SQLConf.get
       expr match {
         case a @ Alias(_, _) =>
-          exprToProtoInternal(a.child, inputs)
+          val r = exprToProtoInternal(a.child, inputs)
+          if (r.isEmpty) {
+            withInfo(expr, a.child)
+          }
+          r
 
         case cast @ Cast(_: Literal, dataType, _, _) =>
           // This can happen after promoting decimal precisions
@@ -487,14 +536,19 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
 
         case Cast(child, dt, timeZoneId, evalMode) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          val evalModeStr = if (evalMode.isInstanceOf[Boolean]) {
-            // Spark 3.2 & 3.3 has ansiEnabled boolean
-            if (evalMode.asInstanceOf[Boolean]) "ANSI" else "LEGACY"
+          if (childExpr.isDefined) {
+            val evalModeStr = if (evalMode.isInstanceOf[Boolean]) {
+              // Spark 3.2 & 3.3 has ansiEnabled boolean
+              if (evalMode.asInstanceOf[Boolean]) "ANSI" else "LEGACY"
+            } else {
+              // Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY
+              evalMode.toString
+            }
+            castToProto(timeZoneId, dt, childExpr, evalModeStr)
           } else {
-            // Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY
-            evalMode.toString
+            withInfo(expr, child)
+            None
           }
-          castToProto(timeZoneId, dt, childExpr, evalModeStr)
 
         case add @ Add(left, right, _) if supportedDataType(left.dataType) =>
           val leftExpr = exprToProtoInternal(left, inputs)
@@ -515,9 +569,14 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setAdd(addBuilder)
                 .build())
           } else {
+            withInfo(add, left, right)
             None
           }
 
+        case add @ Add(left, _, _) if !supportedDataType(left.dataType) =>
+          withInfo(add, s"Unsupported datatype ${left.dataType}")
+          None
+
         case sub @ Subtract(left, right, _) if 
supportedDataType(left.dataType) =>
           val leftExpr = exprToProtoInternal(left, inputs)
           val rightExpr = exprToProtoInternal(right, inputs)
@@ -537,9 +596,14 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setSubtract(builder)
                 .build())
           } else {
+            withInfo(sub, left, right)
             None
           }
 
+        case sub @ Subtract(left, _, _) if !supportedDataType(left.dataType) =>
+          withInfo(sub, s"Unsupported datatype ${left.dataType}")
+          None
+
         case mul @ Multiply(left, right, _)
             if supportedDataType(left.dataType) && 
!decimalBeforeSpark34(left.dataType) =>
           val leftExpr = exprToProtoInternal(left, inputs)
@@ -560,9 +624,19 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setMultiply(builder)
                 .build())
           } else {
+            withInfo(mul, left, right)
             None
           }
 
+        case mul @ Multiply(left, right, _) =>
+          if (!supportedDataType(left.dataType)) {
+            withInfo(mul, s"Unsupported datatype ${left.dataType}")
+          }
+          if (decimalBeforeSpark34(left.dataType)) {
+            withInfo(mul, "Decimal support requires Spark 3.4 or later")
+          }
+          None
+
         case div @ Divide(left, right, _)
             if supportedDataType(left.dataType) && 
!decimalBeforeSpark34(left.dataType) =>
           val leftExpr = exprToProtoInternal(left, inputs)
@@ -586,8 +660,17 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setDivide(builder)
                 .build())
           } else {
+            withInfo(div, left, right)
             None
           }
+        case div @ Divide(left, right, _) =>
+          if (!supportedDataType(left.dataType)) {
+            withInfo(div, s"Unsupported datatype ${left.dataType}")
+          }
+          if (decimalBeforeSpark34(left.dataType)) {
+            withInfo(div, "Decimal support requires Spark 3.4 or later")
+          }
+          None
 
         case rem @ Remainder(left, right, _)
             if supportedDataType(left.dataType) && 
!decimalBeforeSpark34(left.dataType) =>
@@ -609,8 +692,17 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setRemainder(builder)
                 .build())
           } else {
+            withInfo(rem, left, right)
             None
           }
+        case rem @ Remainder(left, _, _) =>
+          if (!supportedDataType(left.dataType)) {
+            withInfo(rem, s"Unsupported datatype ${left.dataType}")
+          }
+          if (decimalBeforeSpark34(left.dataType)) {
+            withInfo(rem, "Decimal support requires Spark 3.4 or later")
+          }
+          None
 
         case EqualTo(left, right) =>
           val leftExpr = exprToProtoInternal(left, inputs)
@@ -627,6 +719,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setEq(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -645,6 +738,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setNeq(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -663,6 +757,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setEqNullSafe(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -681,6 +776,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setNeqNullSafe(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -699,6 +795,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setGt(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -717,6 +814,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setGtEq(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -735,6 +833,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setLt(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -753,6 +852,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setLtEq(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -800,8 +900,12 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setLiteral(exprBuilder)
                 .build())
           } else {
+            withInfo(expr, s"Unsupported datatype $dataType")
             None
           }
+        case Literal(_, dataType) if !supportedDataType(dataType) =>
+          withInfo(expr, s"Unsupported datatype $dataType")
+          None
 
         case Substring(str, Literal(pos, _), Literal(len, _)) =>
           val strExpr = exprToProtoInternal(str, inputs)
@@ -818,6 +922,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setSubstring(builder)
                 .build())
           } else {
+            withInfo(expr, str)
             None
           }
 
@@ -837,27 +942,28 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setLike(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
         // TODO waiting for arrow-rs update
-        //      case RLike(left, right) =>
-        //        val leftExpr = exprToProtoInternal(left, inputs)
-        //        val rightExpr = exprToProtoInternal(right, inputs)
-        //
-        //        if (leftExpr.isDefined && rightExpr.isDefined) {
-        //          val builder = ExprOuterClass.RLike.newBuilder()
-        //          builder.setLeft(leftExpr.get)
-        //          builder.setRight(rightExpr.get)
-        //
-        //          Some(
-        //            ExprOuterClass.Expr
-        //              .newBuilder()
-        //              .setRlike(builder)
-        //              .build())
-        //        } else {
-        //          None
-        //        }
+//      case RLike(left, right) =>
+//        val leftExpr = exprToProtoInternal(left, inputs)
+//        val rightExpr = exprToProtoInternal(right, inputs)
+//
+//        if (leftExpr.isDefined && rightExpr.isDefined) {
+//          val builder = ExprOuterClass.RLike.newBuilder()
+//          builder.setLeft(leftExpr.get)
+//          builder.setRight(rightExpr.get)
+//
+//          Some(
+//            ExprOuterClass.Expr
+//              .newBuilder()
+//              .setRlike(builder)
+//              .build())
+//        } else {
+//          None
+//        }
 
         case StartsWith(left, right) =>
           val leftExpr = exprToProtoInternal(left, inputs)
@@ -874,6 +980,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setStartsWith(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -892,6 +999,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setEndsWith(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -910,6 +1018,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setContains(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -926,6 +1035,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setStringSpace(builder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
@@ -945,6 +1055,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setHour(builder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
@@ -964,6 +1075,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setMinute(builder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
@@ -982,6 +1094,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setTruncDate(builder)
                 .build())
           } else {
+            withInfo(expr, child, format)
             None
           }
 
@@ -1003,6 +1116,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setTruncTimestamp(builder)
                 .build())
           } else {
+            withInfo(expr, child, format)
             None
           }
 
@@ -1022,13 +1136,14 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setSecond(builder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
         case Year(child) =>
           val periodType = exprToProtoInternal(Literal("year"), inputs)
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("datepart", Seq(periodType, childExpr): _*)
+          val optExpr = scalarExprToProto("datepart", Seq(periodType, 
childExpr): _*)
             .map(e => {
               Expr
                 .newBuilder()
@@ -1041,6 +1156,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                     .build())
                 .build()
             })
+          optExprWithInfo(optExpr, expr, child)
 
         case IsNull(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
@@ -1055,6 +1171,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setIsNull(castBuilder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
@@ -1071,6 +1188,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setIsNotNull(castBuilder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
@@ -1097,6 +1215,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setSortOrder(sortOrderBuilder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
@@ -1115,6 +1234,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setAnd(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -1133,6 +1253,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setOr(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -1159,6 +1280,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setCheckOverflow(builder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
@@ -1195,35 +1317,44 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                   .build())
             }
           } else {
+            withInfo(attr, s"unsupported datatype: ${attr.dataType}")
             None
           }
 
         case Abs(child, _) =>
-          exprToProtoInternal(child, inputs).map(childExpr => {
+          val childExpr = exprToProtoInternal(child, inputs)
+          if (childExpr.isDefined) {
             val abs =
               ExprOuterClass.Abs
                 .newBuilder()
-                .setChild(childExpr)
+                .setChild(childExpr.get)
                 .build()
-            Expr.newBuilder().setAbs(abs).build()
-          })
+            Some(Expr.newBuilder().setAbs(abs).build())
+          } else {
+            withInfo(expr, child)
+            None
+          }
 
         case Acos(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("acos", childExpr)
+          val optExpr = scalarExprToProto("acos", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Asin(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("asin", childExpr)
+          val optExpr = scalarExprToProto("asin", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Atan(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("atan", childExpr)
+          val optExpr = scalarExprToProto("atan", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Atan2(left, right) =>
           val leftExpr = exprToProtoInternal(left, inputs)
           val rightExpr = exprToProtoInternal(right, inputs)
-          scalarExprToProto("atan2", leftExpr, rightExpr)
+          val optExpr = scalarExprToProto("atan2", leftExpr, rightExpr)
+          optExprWithInfo(optExpr, expr, left, right)
 
         case e @ Ceil(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
@@ -1231,18 +1362,22 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             case t: DecimalType if t.scale == 0 => // zero scale is no-op
               childExpr
             case t: DecimalType if t.scale < 0 => // Spark disallows negative 
scale SPARK-30252
+              withInfo(e, s"Decimal type $t has negative scale")
               None
             case _ =>
-              scalarExprToProtoWithReturnType("ceil", e.dataType, childExpr)
+              val optExpr = scalarExprToProtoWithReturnType("ceil", 
e.dataType, childExpr)
+              optExprWithInfo(optExpr, expr, child)
           }
 
         case Cos(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("cos", childExpr)
+          val optExpr = scalarExprToProto("cos", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Exp(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("exp", childExpr)
+          val optExpr = scalarExprToProto("exp", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case e @ Floor(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
@@ -1250,27 +1385,33 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             case t: DecimalType if t.scale == 0 => // zero scale is no-op
               childExpr
             case t: DecimalType if t.scale < 0 => // Spark disallows negative 
scale SPARK-30252
+              withInfo(e, s"Decimal type $t has negative scale")
               None
             case _ =>
-              scalarExprToProtoWithReturnType("floor", e.dataType, childExpr)
+              val optExpr = scalarExprToProtoWithReturnType("floor", 
e.dataType, childExpr)
+              optExprWithInfo(optExpr, expr, child)
           }
 
         case Log(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("ln", childExpr)
+          val optExpr = scalarExprToProto("ln", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Log10(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("log10", childExpr)
+          val optExpr = scalarExprToProto("log10", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Log2(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("log2", childExpr)
+          val optExpr = scalarExprToProto("log2", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Pow(left, right) =>
           val leftExpr = exprToProtoInternal(left, inputs)
           val rightExpr = exprToProtoInternal(right, inputs)
-          scalarExprToProto("pow", leftExpr, rightExpr)
+          val optExpr = scalarExprToProto("pow", leftExpr, rightExpr)
+          optExprWithInfo(optExpr, expr, left, right)
 
         // round function for Spark 3.2 does not allow negative round target 
scale. In addition,
         // it has different result precision/scale for decimals. Supporting 
only 3.3 and above.
@@ -1282,6 +1423,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           lazy val childExpr = exprToProtoInternal(r.child, inputs)
           r.child.dataType match {
             case t: DecimalType if t.scale < 0 => // Spark disallows negative 
scale SPARK-30252
+              withInfo(r, "Decimal type has negative scale")
               None
             case _ if scaleV == null =>
               exprToProtoInternal(Literal(null), inputs)
@@ -1302,36 +1444,47 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               // I.e. 6.13171162472835E18 == 6.1317116247283497E18. However, 
toString() does not.
               // That results in round(6.1317116247283497E18, -5) == 
6.1317116247282995E18 instead
               // of 6.1317116247283999E18.
+              withInfo(r, "Comet does not support Spark's BigDecimal rounding")
               None
             case _ =>
               // `scale` must be Int64 type in DataFusion
               val scaleExpr = exprToProtoInternal(Literal(_scale.toLong, 
LongType), inputs)
-              scalarExprToProtoWithReturnType("round", r.dataType, childExpr, 
scaleExpr)
+              val optExpr =
+                scalarExprToProtoWithReturnType("round", r.dataType, 
childExpr, scaleExpr)
+              optExprWithInfo(optExpr, expr, r.child)
           }
 
         case Signum(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("signum", childExpr)
+          val optExpr = scalarExprToProto("signum", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Sin(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("sin", childExpr)
+          val optExpr = scalarExprToProto("sin", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Sqrt(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("sqrt", childExpr)
+          val optExpr = scalarExprToProto("sqrt", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Tan(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("tan", childExpr)
+          val optExpr = scalarExprToProto("tan", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case Ascii(child) =>
-          val childExpr = exprToProtoInternal(Cast(child, StringType), inputs)
-          scalarExprToProto("ascii", childExpr)
+          val castExpr = Cast(child, StringType)
+          val childExpr = exprToProtoInternal(castExpr, inputs)
+          val optExpr = scalarExprToProto("ascii", childExpr)
+          optExprWithInfo(optExpr, expr, castExpr)
 
         case BitLength(child) =>
-          val childExpr = exprToProtoInternal(Cast(child, StringType), inputs)
-          scalarExprToProto("bit_length", childExpr)
+          val castExpr = Cast(child, StringType)
+          val childExpr = exprToProtoInternal(castExpr, inputs)
+          val optExpr = scalarExprToProto("bit_length", childExpr)
+          optExprWithInfo(optExpr, expr, castExpr)
 
         case If(predicate, trueValue, falseValue) =>
           val predicateExpr = exprToProtoInternal(predicate, inputs)
@@ -1348,22 +1501,32 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setIf(builder)
                 .build())
           } else {
+            withInfo(expr, predicate, trueValue, falseValue)
             None
           }
 
         case CaseWhen(branches, elseValue) =>
-          val whenSeq = branches.map(elements => 
exprToProtoInternal(elements._1, inputs))
-          val thenSeq = branches.map(elements => 
exprToProtoInternal(elements._2, inputs))
+          var allBranches: Seq[Expression] = Seq()
+          val whenSeq = branches.map(elements => {
+            allBranches = allBranches :+ elements._1
+            exprToProtoInternal(elements._1, inputs)
+          })
+          val thenSeq = branches.map(elements => {
+            allBranches = allBranches :+ elements._1
+            exprToProtoInternal(elements._2, inputs)
+          })
           assert(whenSeq.length == thenSeq.length)
           if (whenSeq.forall(_.isDefined) && thenSeq.forall(_.isDefined)) {
             val builder = ExprOuterClass.CaseWhen.newBuilder()
             builder.addAllWhen(whenSeq.map(_.get).asJava)
             builder.addAllThen(thenSeq.map(_.get).asJava)
             if (elseValue.isDefined) {
-              val elseValueExpr = exprToProtoInternal(elseValue.get, inputs)
+              val elseValueExpr =
+                exprToProtoInternal(elseValue.get, inputs)
               if (elseValueExpr.isDefined) {
                 builder.setElseExpr(elseValueExpr.get)
               } else {
+                withInfo(expr, elseValue.get)
                 return None
               }
             }
@@ -1373,78 +1536,113 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setCaseWhen(builder)
                 .build())
           } else {
+            withInfo(expr, allBranches: _*)
             None
           }
-
         case ConcatWs(children) =>
-          val exprs = children.map(e => exprToProtoInternal(Cast(e, 
StringType), inputs))
-          scalarExprToProto("concat_ws", exprs: _*)
+          var childExprs: Seq[Expression] = Seq()
+          val exprs = children.map(e => {
+            val castExpr = Cast(e, StringType)
+            childExprs = childExprs :+ castExpr
+            exprToProtoInternal(castExpr, inputs)
+          })
+          val optExpr = scalarExprToProto("concat_ws", exprs: _*)
+          optExprWithInfo(optExpr, expr, childExprs: _*)
 
         case Chr(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProto("chr", childExpr)
+          val optExpr = scalarExprToProto("chr", childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case InitCap(child) =>
-          val childExpr = exprToProtoInternal(Cast(child, StringType), inputs)
-          scalarExprToProto("initcap", childExpr)
+          val castExpr = Cast(child, StringType)
+          val childExpr = exprToProtoInternal(castExpr, inputs)
+          val optExpr = scalarExprToProto("initcap", childExpr)
+          optExprWithInfo(optExpr, expr, castExpr)
 
         case Length(child) =>
-          val childExpr = exprToProtoInternal(Cast(child, StringType), inputs)
-          scalarExprToProto("length", childExpr)
+          val castExpr = Cast(child, StringType)
+          val childExpr = exprToProtoInternal(castExpr, inputs)
+          val optExpr = scalarExprToProto("length", childExpr)
+          optExprWithInfo(optExpr, expr, castExpr)
 
         case Lower(child) =>
-          val childExpr = exprToProtoInternal(Cast(child, StringType), inputs)
-          scalarExprToProto("lower", childExpr)
+          val castExpr = Cast(child, StringType)
+          val childExpr = exprToProtoInternal(castExpr, inputs)
+          val optExpr = scalarExprToProto("lower", childExpr)
+          optExprWithInfo(optExpr, expr, castExpr)
 
         case Md5(child) =>
-          val childExpr = exprToProtoInternal(Cast(child, StringType), inputs)
-          scalarExprToProto("md5", childExpr)
+          val castExpr = Cast(child, StringType)
+          val childExpr = exprToProtoInternal(castExpr, inputs)
+          val optExpr = scalarExprToProto("md5", childExpr)
+          optExprWithInfo(optExpr, expr, castExpr)
 
         case OctetLength(child) =>
-          val childExpr = exprToProtoInternal(Cast(child, StringType), inputs)
-          scalarExprToProto("octet_length", childExpr)
+          val castExpr = Cast(child, StringType)
+          val childExpr = exprToProtoInternal(castExpr, inputs)
+          val optExpr = scalarExprToProto("octet_length", childExpr)
+          optExprWithInfo(optExpr, expr, castExpr)
 
         case Reverse(child) =>
-          val childExpr = exprToProtoInternal(Cast(child, StringType), inputs)
-          scalarExprToProto("reverse", childExpr)
+          val castExpr = Cast(child, StringType)
+          val childExpr = exprToProtoInternal(castExpr, inputs)
+          val optExpr = scalarExprToProto("reverse", childExpr)
+          optExprWithInfo(optExpr, expr, castExpr)
 
         case StringInstr(str, substr) =>
-          val leftExpr = exprToProtoInternal(Cast(str, StringType), inputs)
-          val rightExpr = exprToProtoInternal(Cast(substr, StringType), inputs)
-          scalarExprToProto("strpos", leftExpr, rightExpr)
+          val leftCast = Cast(str, StringType)
+          val rightCast = Cast(substr, StringType)
+          val leftExpr = exprToProtoInternal(leftCast, inputs)
+          val rightExpr = exprToProtoInternal(rightCast, inputs)
+          val optExpr = scalarExprToProto("strpos", leftExpr, rightExpr)
+          optExprWithInfo(optExpr, expr, leftCast, rightCast)
 
         case StringRepeat(str, times) =>
-          val leftExpr = exprToProtoInternal(Cast(str, StringType), inputs)
-          val rightExpr = exprToProtoInternal(Cast(times, LongType), inputs)
-          scalarExprToProto("repeat", leftExpr, rightExpr)
+          val leftCast = Cast(str, StringType)
+          val rightCast = Cast(times, LongType)
+          val leftExpr = exprToProtoInternal(leftCast, inputs)
+          val rightExpr = exprToProtoInternal(rightCast, inputs)
+          val optExpr = scalarExprToProto("repeat", leftExpr, rightExpr)
+          optExprWithInfo(optExpr, expr, leftCast, rightCast)
 
         case StringReplace(src, search, replace) =>
-          val srcExpr = exprToProtoInternal(Cast(src, StringType), inputs)
-          val searchExpr = exprToProtoInternal(Cast(search, StringType), 
inputs)
-          val replaceExpr = exprToProtoInternal(Cast(replace, StringType), 
inputs)
-          scalarExprToProto("replace", srcExpr, searchExpr, replaceExpr)
+          val srcCast = Cast(src, StringType)
+          val searchCast = Cast(search, StringType)
+          val replaceCast = Cast(replace, StringType)
+          val srcExpr = exprToProtoInternal(srcCast, inputs)
+          val searchExpr = exprToProtoInternal(searchCast, inputs)
+          val replaceExpr = exprToProtoInternal(replaceCast, inputs)
+          val optExpr = scalarExprToProto("replace", srcExpr, searchExpr, 
replaceExpr)
+          optExprWithInfo(optExpr, expr, srcCast, searchCast, replaceCast)
 
         case StringTranslate(src, matching, replace) =>
-          val srcExpr = exprToProtoInternal(Cast(src, StringType), inputs)
-          val matchingExpr = exprToProtoInternal(Cast(matching, StringType), 
inputs)
-          val replaceExpr = exprToProtoInternal(Cast(replace, StringType), 
inputs)
-          scalarExprToProto("translate", srcExpr, matchingExpr, replaceExpr)
+          val srcCast = Cast(src, StringType)
+          val matchingCast = Cast(matching, StringType)
+          val replaceCast = Cast(replace, StringType)
+          val srcExpr = exprToProtoInternal(srcCast, inputs)
+          val matchingExpr = exprToProtoInternal(matchingCast, inputs)
+          val replaceExpr = exprToProtoInternal(replaceCast, inputs)
+          val optExpr = scalarExprToProto("translate", srcExpr, matchingExpr, 
replaceExpr)
+          optExprWithInfo(optExpr, expr, srcCast, matchingCast, replaceCast)
 
         case StringTrim(srcStr, trimStr) =>
-          trim(srcStr, trimStr, inputs, "trim")
+          trim(expr, srcStr, trimStr, inputs, "trim")
 
         case StringTrimLeft(srcStr, trimStr) =>
-          trim(srcStr, trimStr, inputs, "ltrim")
+          trim(expr, srcStr, trimStr, inputs, "ltrim")
 
         case StringTrimRight(srcStr, trimStr) =>
-          trim(srcStr, trimStr, inputs, "rtrim")
+          trim(expr, srcStr, trimStr, inputs, "rtrim")
 
         case StringTrimBoth(srcStr, trimStr, _) =>
-          trim(srcStr, trimStr, inputs, "btrim")
+          trim(expr, srcStr, trimStr, inputs, "btrim")
 
         case Upper(child) =>
-          val childExpr = exprToProtoInternal(Cast(child, StringType), inputs)
-          scalarExprToProto("upper", childExpr)
+          val castExpr = Cast(child, StringType)
+          val childExpr = exprToProtoInternal(castExpr, inputs)
+          val optExpr = scalarExprToProto("upper", childExpr)
+          optExprWithInfo(optExpr, expr, castExpr)
 
         case BitwiseAnd(left, right) =>
           val leftExpr = exprToProtoInternal(left, inputs)
@@ -1461,6 +1659,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setBitwiseAnd(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -1477,6 +1676,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setBitwiseNot(builder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
@@ -1495,6 +1695,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setBitwiseOr(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
@@ -1513,18 +1714,20 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setBitwiseXor(builder)
                 .build())
           } else {
+            withInfo(expr, left, right)
             None
           }
 
         case ShiftRight(left, right) =>
           val leftExpr = exprToProtoInternal(left, inputs)
-          val rightExpr = if (left.dataType == LongType) {
-            // DataFusion bitwise shift right expression requires
-            // same data type between left and right side
-            exprToProtoInternal(Cast(right, LongType), inputs)
+          // DataFusion bitwise shift right expression requires
+          // same data type between left and right side
+          val rightExpression = if (left.dataType == LongType) {
+            Cast(right, LongType)
           } else {
-            exprToProtoInternal(right, inputs)
+            right
           }
+          val rightExpr = exprToProtoInternal(rightExpression, inputs)
 
           if (leftExpr.isDefined && rightExpr.isDefined) {
             val builder = ExprOuterClass.BitwiseShiftRight.newBuilder()
@@ -1537,18 +1740,20 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setBitwiseShiftRight(builder)
                 .build())
           } else {
+            withInfo(expr, left, rightExpression)
             None
           }
 
         case ShiftLeft(left, right) =>
           val leftExpr = exprToProtoInternal(left, inputs)
-          val rightExpr = if (left.dataType == LongType) {
-            // DataFusion bitwise shift left expression requires
-            // same data type between left and right side
-            exprToProtoInternal(Cast(right, LongType), inputs)
+          // DataFusion bitwise shift right expression requires
+          // same data type between left and right side
+          val rightExpression = if (left.dataType == LongType) {
+            Cast(right, LongType)
           } else {
-            exprToProtoInternal(right, inputs)
+            right
           }
+          val rightExpr = exprToProtoInternal(rightExpression, inputs)
 
           if (leftExpr.isDefined && rightExpr.isDefined) {
             val builder = ExprOuterClass.BitwiseShiftLeft.newBuilder()
@@ -1561,11 +1766,12 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setBitwiseShiftLeft(builder)
                 .build())
           } else {
+            withInfo(expr, left, rightExpression)
             None
           }
 
         case In(value, list) =>
-          in(value, list, inputs, false)
+          in(expr, value, list, inputs, false)
 
         case InSet(value, hset) =>
           val valueDataType = value.dataType
@@ -1574,10 +1780,10 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           }.toSeq
           // Change `InSet` to `In` expression
           // We do Spark `InSet` optimization in native (DataFusion) side.
-          in(value, list, inputs, false)
+          in(expr, value, list, inputs, false)
 
         case Not(In(value, list)) =>
-          in(value, list, inputs, true)
+          in(expr, value, list, inputs, true)
 
         case Not(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
@@ -1590,6 +1796,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setNot(builder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
@@ -1604,6 +1811,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setNegative(builder)
                 .build())
           } else {
+            withInfo(expr, child)
             None
           }
 
@@ -1612,20 +1820,25 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           val childExpr = scalarExprToProto("coalesce", exprChildren: _*)
           // TODO: Remove this once we have new DataFusion release which 
includes
           // the fix: https://github.com/apache/arrow-datafusion/pull/9459
-          castToProto(None, a.dataType, childExpr, "LEGACY")
+          if (childExpr.isDefined) {
+            castToProto(None, a.dataType, childExpr, "LEGACY")
+          } else {
+            withInfo(expr, a.children: _*)
+            None
+          }
 
         // With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called 
to pad spaces for
         // char types. Use rpad to achieve the behavior.
         // See https://github.com/apache/spark/pull/38151
         case StaticInvoke(
-              clz: Class[_],
+              _: Class[CharVarcharCodegenUtils],
               _: StringType,
               "readSidePadding",
               arguments,
               _,
               true,
               false,
-              true) if clz == classOf[CharVarcharCodegenUtils] && 
arguments.size == 2 =>
+              true) if arguments.size == 2 =>
           val argsExpr = Seq(
             exprToProtoInternal(Cast(arguments(0), StringType), inputs),
             exprToProtoInternal(arguments(1), inputs))
@@ -1637,15 +1850,18 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
 
             
Some(ExprOuterClass.Expr.newBuilder().setScalarFunc(builder).build())
           } else {
+            withInfo(expr, arguments: _*)
             None
           }
 
         case KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) =>
           val dataType = serializeDataType(expr.dataType)
           if (dataType.isEmpty) {
+            withInfo(expr, s"Unsupported datatype ${expr.dataType}")
             return None
           }
-          exprToProtoInternal(expr, inputs).map { child =>
+          val ex = exprToProtoInternal(expr, inputs)
+          ex.map { child =>
             val builder = ExprOuterClass.NormalizeNaNAndZero
               .newBuilder()
               .setChild(child)
@@ -1656,6 +1872,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
         case s @ execution.ScalarSubquery(_, _) =>
           val dataType = serializeDataType(s.dataType)
           if (dataType.isEmpty) {
+            withInfo(s, s"Scalar subquery returns unsupported datatype 
${s.dataType}")
             return None
           }
 
@@ -1667,14 +1884,17 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
 
         case UnscaledValue(child) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProtoWithReturnType("unscaled_value", LongType, 
childExpr)
+          val optExpr = scalarExprToProtoWithReturnType("unscaled_value", 
LongType, childExpr)
+          optExprWithInfo(optExpr, expr, child)
 
         case MakeDecimal(child, precision, scale, true) =>
           val childExpr = exprToProtoInternal(child, inputs)
-          scalarExprToProtoWithReturnType(
+          val optExpr = scalarExprToProtoWithReturnType(
             "make_decimal",
             DecimalType(precision, scale),
             childExpr)
+          optExprWithInfo(optExpr, expr, child)
+
         case b @ BinaryExpression(_, _) if isBloomFilterMightContain(b) =>
           val bloomFilter = b.left
           val value = b.right
@@ -1690,30 +1910,37 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
                 .setBloomFilterMightContain(builder)
                 .build())
           } else {
+            withInfo(expr, bloomFilter, value)
             None
           }
 
-        case e =>
-          emitWarning(s"unsupported Spark expression: '$e' of class 
'${e.getClass.getName}")
+        case _ =>
+          withInfo(expr, s"${expr.prettyName} is not supported", 
expr.children: _*)
           None
       }
     }
 
     def trim(
+        expr: Expression, // parent expression
         srcStr: Expression,
         trimStr: Option[Expression],
         inputs: Seq[Attribute],
         trimType: String): Option[Expr] = {
-      val srcExpr = exprToProtoInternal(Cast(srcStr, StringType), inputs)
+      val srcCast = Cast(srcStr, StringType)
+      val srcExpr = exprToProtoInternal(srcCast, inputs)
       if (trimStr.isDefined) {
-        val trimExpr = exprToProtoInternal(Cast(trimStr.get, StringType), 
inputs)
-        scalarExprToProto(trimType, srcExpr, trimExpr)
+        val trimCast = Cast(trimStr.get, StringType)
+        val trimExpr = exprToProtoInternal(trimCast, inputs)
+        val optExpr = scalarExprToProto(trimType, srcExpr, trimExpr)
+        optExprWithInfo(optExpr, expr, null, srcCast, trimCast)
       } else {
-        scalarExprToProto(trimType, srcExpr)
+        val optExpr = scalarExprToProto(trimType, srcExpr)
+        optExprWithInfo(optExpr, expr, null, srcCast)
       }
     }
 
     def in(
+        expr: Expression,
         value: Expression,
         list: Seq[Expression],
         inputs: Seq[Attribute],
@@ -1731,6 +1958,8 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             .setIn(builder)
             .build())
       } else {
+        val allExprs = list ++ Seq(value)
+        withInfo(expr, allExprs: _*)
         None
       }
     }
@@ -1764,7 +1993,8 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
       args: Option[Expr]*): Option[Expr] = {
     args.foreach {
       case Some(a) => builder.addArgs(a)
-      case _ => return None
+      case _ =>
+        return None
     }
     Some(ExprOuterClass.Expr.newBuilder().setScalarFunc(builder).build())
   }
@@ -1808,6 +2038,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             .addAllProjectList(exprs.map(_.get).asJava)
           Some(result.setProjection(projectBuilder).build())
         } else {
+          withInfo(op, projectList: _*)
           None
         }
 
@@ -1818,6 +2049,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           val filterBuilder = 
OperatorOuterClass.Filter.newBuilder().setPredicate(cond.get)
           Some(result.setFilter(filterBuilder).build())
         } else {
+          withInfo(op, condition, child)
           None
         }
 
@@ -1830,6 +2062,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             .addAllSortOrders(sortOrders.map(_.get).asJava)
           Some(result.setSort(sortBuilder).build())
         } else {
+          withInfo(op, sortOrder: _*)
           None
         }
 
@@ -1843,6 +2076,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             .setOffset(0)
           Some(result.setLimit(limitBuilder).build())
         } else {
+          withInfo(op, "No child operator")
           None
         }
 
@@ -1860,11 +2094,16 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
 
           Some(result.setLimit(limitBuilder).build())
         } else {
+          withInfo(op, "No child operator")
           None
         }
 
       case ExpandExec(projections, _, child) if 
isCometOperatorEnabled(op.conf, "expand") =>
-        val projExprs = projections.flatMap(_.map(exprToProto(_, 
child.output)))
+        var allProjExprs: Seq[Expression] = Seq()
+        val projExprs = projections.flatMap(_.map(e => {
+          allProjExprs = allProjExprs :+ e
+          exprToProto(e, child.output)
+        }))
 
         if (projExprs.forall(_.isDefined) && childOp.nonEmpty) {
           val expandBuilder = OperatorOuterClass.Expand
@@ -1873,6 +2112,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             .setNumExprPerProject(projections.head.size)
           Some(result.setExpand(expandBuilder).build())
         } else {
+          withInfo(op, allProjExprs: _*)
           None
         }
 
@@ -1887,11 +2127,13 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             resultExpressions,
             child) if isCometOperatorEnabled(op.conf, "aggregate") =>
         if (groupingExpressions.isEmpty && aggregateExpressions.isEmpty) {
+          withInfo(op, "No group by or aggregation")
           return None
         }
 
         // Aggregate expressions with filter are not supported yet.
         if (aggregateExpressions.exists(_.filter.isDefined)) {
+          withInfo(op, "Aggregate expression with filter is not supported")
           return None
         }
 
@@ -1917,7 +2159,9 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           val attributes = groupingExpressions.map(_.toAttribute) ++ 
aggregateAttributes
           val resultExprs = resultExpressions.map(exprToProto(_, attributes))
           if (resultExprs.exists(_.isEmpty)) {
-            emitWarning(s"Unsupported result expressions found in: 
${resultExpressions}")
+            val msg = s"Unsupported result expressions found in: 
${resultExpressions}"
+            emitWarning(msg)
+            withInfo(op, msg, resultExpressions: _*)
             return None
           }
           hashAggBuilder.addAllResultExprs(resultExprs.map(_.get).asJava)
@@ -1928,13 +2172,16 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           if (modes.size != 1) {
             // This shouldn't happen as all aggregation expressions should 
share the same mode.
             // Fallback to Spark nevertheless here.
+            withInfo(op, "All aggregate expressions do not have the same mode")
             return None
           }
 
           val mode = modes.head match {
             case Partial => CometAggregateMode.Partial
             case Final => CometAggregateMode.Final
-            case _ => return None
+            case _ =>
+              withInfo(op, s"Unsupported aggregation mode ${modes.head}")
+              return None
           }
 
           // In final mode, the aggregate expressions are bound to the output 
of the
@@ -1945,7 +2192,8 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           // `output` is only used when `binding` is true (i.e., non-Final)
           val output = child.output
 
-          val aggExprs = aggregateExpressions.map(aggExprToProto(_, output, 
binding))
+          val aggExprs =
+            aggregateExpressions.map(aggExprToProto(_, output, binding))
           if (childOp.nonEmpty && groupingExprs.forall(_.isDefined) &&
             aggExprs.forall(_.isDefined)) {
             val hashAggBuilder = OperatorOuterClass.HashAggregate.newBuilder()
@@ -1955,7 +2203,9 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               val attributes = groupingExpressions.map(_.toAttribute) ++ 
aggregateAttributes
               val resultExprs = resultExpressions.map(exprToProto(_, 
attributes))
               if (resultExprs.exists(_.isEmpty)) {
-                emitWarning(s"Unsupported result expressions found in: 
${resultExpressions}")
+                val msg = s"Unsupported result expressions found in: 
${resultExpressions}"
+                emitWarning(msg)
+                withInfo(op, msg, resultExpressions: _*)
                 return None
               }
               hashAggBuilder.addAllResultExprs(resultExprs.map(_.get).asJava)
@@ -1963,6 +2213,9 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             hashAggBuilder.setModeValue(mode.getNumber)
             Some(result.setHashAgg(hashAggBuilder).build())
           } else {
+            val allChildren: Seq[Expression] =
+              groupingExpressions ++ aggregateExpressions ++ 
aggregateAttributes
+            withInfo(op, allChildren: _*)
             None
           }
         }
@@ -1974,18 +2227,21 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             join.isInstanceOf[ShuffledHashJoinExec]) &&
           !(isCometOperatorEnabled(op.conf, "broadcast_hash_join") &&
             join.isInstanceOf[BroadcastHashJoinExec])) {
+          withInfo(join, s"Invalid hash join type ${join.nodeName}")
           return None
         }
 
         if (join.buildSide == BuildRight) {
           // DataFusion HashJoin assumes build side is always left.
           // TODO: support BuildRight
+          withInfo(join, "BuildRight is not supported")
           return None
         }
 
         val condition = join.condition.map { cond =>
           val condProto = exprToProto(cond, join.left.output ++ 
join.right.output)
           if (condProto.isEmpty) {
+            withInfo(join, cond)
             return None
           }
           condProto.get
@@ -1998,7 +2254,10 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           case FullOuter => JoinType.FullOuter
           case LeftSemi => JoinType.LeftSemi
           case LeftAnti => JoinType.LeftAnti
-          case _ => return None // Spark doesn't support other join types
+          case _ =>
+            // Spark doesn't support other join types
+            withInfo(join, s"Unsupported join type ${join.joinType}")
+            return None
         }
 
         val leftKeys = join.leftKeys.map(exprToProto(_, join.left.output))
@@ -2015,6 +2274,8 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           condition.foreach(joinBuilder.setCondition)
           Some(result.setHashJoin(joinBuilder).build())
         } else {
+          val allExprs: Seq[Expression] = join.leftKeys ++ join.rightKeys
+          withInfo(join, allExprs: _*)
           None
         }
 
@@ -2040,6 +2301,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
 
         // TODO: Support SortMergeJoin with join condition after new 
DataFusion release
         if (join.condition.isDefined) {
+          withInfo(op, "Sort merge join with a join condition is not 
supported")
           return None
         }
 
@@ -2050,7 +2312,10 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           case FullOuter => JoinType.FullOuter
           case LeftSemi => JoinType.LeftSemi
           case LeftAnti => JoinType.LeftAnti
-          case _ => return None // Spark doesn't support other join types
+          case _ =>
+            // Spark doesn't support other join types
+            withInfo(op, s"Unsupported join type ${join.joinType}")
+            return None
         }
 
         val leftKeys = join.leftKeys.map(exprToProto(_, join.left.output))
@@ -2071,9 +2336,15 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
             .addAllRightJoinKeys(rightKeys.map(_.get).asJava)
           Some(result.setSortMergeJoin(joinBuilder).build())
         } else {
+          val allExprs: Seq[Expression] = join.leftKeys ++ join.rightKeys
+          withInfo(join, allExprs: _*)
           None
         }
 
+      case join: SortMergeJoinExec if !isCometOperatorEnabled(op.conf, 
"sort_merge_join") =>
+        withInfo(join, "SortMergeJoin is not enabled")
+        None
+
       case op if isCometSink(op) =>
         // These operators are source of Comet native execution chain
         val scanBuilder = OperatorOuterClass.Scan.newBuilder()
@@ -2091,8 +2362,10 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
           Some(result.setScan(scanBuilder).build())
         } else {
           // There are unsupported scan type
-          emitWarning(
-            s"unsupported Comet operator: ${op.nodeName}, due to unsupported 
data types above")
+          val msg =
+            s"unsupported Comet operator: ${op.nodeName}, due to unsupported 
data types above"
+          emitWarning(msg)
+          withInfo(op, msg)
           None
         }
 
@@ -2101,7 +2374,9 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
         //  1. it is not Spark shuffle operator, which is handled separately
         //  2. it is not a Comet operator
         if (!op.nodeName.contains("Comet") && 
!op.isInstanceOf[ShuffleExchangeExec]) {
-          emitWarning(s"unsupported Spark operator: ${op.nodeName}")
+          val msg = s"unsupported Spark operator: ${op.nodeName}"
+          emitWarning(msg)
+          withInfo(op, msg)
         }
         None
     }
@@ -2145,7 +2420,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
    * Check if the datatypes of shuffle input are supported. This is used for 
Columnar shuffle
    * which supports struct/array.
    */
-  def supportPartitioningTypes(inputs: Seq[Attribute]): Boolean = {
+  def supportPartitioningTypes(inputs: Seq[Attribute]): (Boolean, String) = {
     def supportedDataType(dt: DataType): Boolean = dt match {
       case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: 
FloatType |
           _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | 
_: DecimalType |
@@ -2170,17 +2445,21 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
     }
 
     // Check if the datatypes of shuffle input are supported.
+    var msg = ""
     val supported = inputs.forall(attr => supportedDataType(attr.dataType))
     if (!supported) {
-      emitWarning(s"unsupported Spark partitioning: ${inputs.map(_.dataType)}")
+      msg = s"unsupported Spark partitioning: ${inputs.map(_.dataType)}"
+      emitWarning(msg)
     }
-    supported
+    (supported, msg)
   }
 
   /**
    * Whether the given Spark partitioning is supported by Comet.
    */
-  def supportPartitioning(inputs: Seq[Attribute], partitioning: Partitioning): 
Boolean = {
+  def supportPartitioning(
+      inputs: Seq[Attribute],
+      partitioning: Partitioning): (Boolean, String) = {
     def supportedDataType(dt: DataType): Boolean = dt match {
       case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: 
FloatType |
           _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | 
_: DecimalType |
@@ -2195,17 +2474,33 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
     val supported = inputs.forall(attr => supportedDataType(attr.dataType))
 
     if (!supported) {
-      emitWarning(s"unsupported Spark partitioning: ${inputs.map(_.dataType)}")
-      false
+      val msg = s"unsupported Spark partitioning: ${inputs.map(_.dataType)}"
+      emitWarning(msg)
+      (false, msg)
     } else {
       partitioning match {
         case HashPartitioning(expressions, _) =>
-          expressions.map(QueryPlanSerde.exprToProto(_, 
inputs)).forall(_.isDefined)
-        case SinglePartition => true
+          (expressions.map(QueryPlanSerde.exprToProto(_, 
inputs)).forall(_.isDefined), null)
+        case SinglePartition => (true, null)
         case other =>
-          emitWarning(s"unsupported Spark partitioning: 
${other.getClass.getName}")
-          false
+          val msg = s"unsupported Spark partitioning: 
${other.getClass.getName}"
+          emitWarning(msg)
+          (false, msg)
       }
     }
   }
+
+  // Utility method. Adds explain info if the result of calling exprToProto is 
None
+  private def optExprWithInfo(
+      optExpr: Option[Expr],
+      expr: Expression,
+      childExpr: Expression*): Option[Expr] = {
+    optExpr match {
+      case None =>
+        withInfo(expr, childExpr: _*)
+        None
+      case o => o
+    }
+
+  }
 }
diff --git 
a/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
 
b/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
index 85c6413..ffec1bd 100644
--- 
a/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
+++ 
b/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
@@ -20,7 +20,7 @@
 package org.apache.comet.shims
 
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
-import org.apache.spark.sql.execution.{LimitExec, SparkPlan}
+import org.apache.spark.sql.execution.{LimitExec, QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
 
 trait ShimCometSparkSessionExtensions {
@@ -49,4 +49,19 @@ object ShimCometSparkSessionExtensions {
     .filter(_.isInstanceOf[Int])
     .map(_.asInstanceOf[Int])
     .headOption
+
+  // Extended info is available only since Spark 4.0.0
+  // (https://issues.apache.org/jira/browse/SPARK-47289)
+  def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = {
+    try {
+      // Look for QueryExecution.extendedExplainInfo(scala.Function1[String, 
Unit], SparkPlan)
+      qe.getClass.getDeclaredMethod(
+        "extendedExplainInfo",
+        classOf[String => Unit],
+        classOf[SparkPlan])
+    } catch {
+      case _: NoSuchMethodException | _: SecurityException => return false
+    }
+    true
+  }
 }
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/ExtendedExplainGenerator.scala 
b/spark/src/main/scala/org/apache/spark/sql/ExtendedExplainGenerator.scala
new file mode 100644
index 0000000..3c5ae95
--- /dev/null
+++ b/spark/src/main/scala/org/apache/spark/sql/ExtendedExplainGenerator.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.execution.SparkPlan
+
+/**
+ * A trait for a session extension to implement that provides addition explain 
plan information.
+ * We copy this from Spark 4.0 since this trait is not available in Spark 3.x. 
We can remove this
+ * after dropping Spark 3.x support.
+ */
+
+trait ExtendedExplainGenerator {
+  def title: String
+
+  def generateExtendedInfo(plan: SparkPlan): String
+}
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index bbe7edd..ce9a7cd 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -1357,4 +1357,55 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     testCastedColumn(inputValues = Seq("car", "Truck"))
   }
 
+  test("explain comet") {
+    assume(isSpark34Plus)
+    withSQLConf(
+      SQLConf.ANSI_ENABLED.key -> "false",
+      CometConf.COMET_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
+      "spark.sql.extendedExplainProvider" -> 
"org.apache.comet.ExtendedExplainInfo") {
+      val table = "test"
+      withTable(table) {
+        sql(s"create table $table(c0 int, c1 int , c2 float) using parquet")
+        sql(s"insert into $table values(0, 1, 100.000001)")
+
+        Seq(
+          (
+            s"SELECT cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) 
as C from $table",
+            "make_interval is not supported"),
+          (
+            "SELECT "
+              + "date_part('YEAR', make_interval(c0, c1, c0, c1, c0, c0, c2))"
+              + " + "
+              + "date_part('MONTH', make_interval(c0, c1, c0, c1, c0, c0, c2))"
+              + s" as yrs_and_mths from $table",
+            "extractintervalyears is not supported\n" +
+              "extractintervalmonths is not supported"),
+          (
+            s"SELECT sum(c0), sum(c2) from $table group by c1",
+            "Native shuffle is not enabled\n" +
+              "AQEShuffleRead is not supported"),
+          (
+            "SELECT A.c1, A.sum_c0, A.sum_c2, B.casted from "
+              + s"(SELECT c1, sum(c0) as sum_c0, sum(c2) as sum_c2 from $table 
group by c1) as A, "
+              + s"(SELECT c1, cast(make_interval(c0, c1, c0, c1, c0, c0, c2) 
as string) as casted from $table) as B "
+              + "where A.c1 = B.c1 ",
+            "Native shuffle is not enabled\n" +
+              "AQEShuffleRead is not supported\n" +
+              "make_interval is not supported\n" +
+              "BroadcastExchange is not supported\n" +
+              "BroadcastHashJoin disabled because not all child plans are 
native"))
+          .foreach(test => {
+            val qry = test._1
+            val expected = test._2
+            val df = sql(qry)
+            df.collect() // force an execution
+            checkSparkAnswerAndCompareExplainPlan(df, expected)
+          })
+      }
+    }
+  }
+
 }
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala
index 8f83ac0..ad989f5 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala
@@ -46,6 +46,9 @@ trait CometTPCQueryBase extends Logging {
       .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
       .set("spark.sql.crossJoin.enabled", "true")
       .setIfMissing("parquet.enable.dictionary", "true")
+      .set(
+        "spark.shuffle.manager",
+        "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
 
     val sparkSession = SparkSession.builder
       .config(conf)
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala
index ac3c047..6a58320 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala
@@ -31,12 +31,14 @@ import org.apache.spark.sql.comet.CometExec
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 
-import org.apache.comet.CometConf
+import org.apache.comet.{CometConf, ExtendedExplainInfo}
+import org.apache.comet.shims.ShimCometSparkSessionExtensions
 
 trait CometTPCQueryListBase
     extends CometTPCQueryBase
     with AdaptiveSparkPlanHelper
-    with SQLHelper {
+    with SQLHelper
+    with ShimCometSparkSessionExtensions {
   var output: Option[OutputStream] = None
 
   def main(args: Array[String]): Unit = {
@@ -84,11 +86,16 @@ trait CometTPCQueryListBase
       withSQLConf(
         CometConf.COMET_ENABLED.key -> "true",
         CometConf.COMET_EXEC_ENABLED.key -> "true",
-        CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") {
+        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+        CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
+        // Lower bloom filter thresholds to allows us to simulate the plan 
produced at larger scale.
+        "spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold" -> 
"1MB",
+        
"spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold" -> 
"1MB") {
 
         val df = cometSpark.sql(queryString)
         val cometPlans = mutable.HashSet.empty[String]
-        stripAQEPlan(df.queryExecution.executedPlan).foreach {
+        val executedPlan = df.queryExecution.executedPlan
+        stripAQEPlan(executedPlan).foreach {
           case op: CometExec =>
             cometPlans += s"${op.nodeName}"
           case _ =>
@@ -100,6 +107,9 @@ trait CometTPCQueryListBase
         } else {
           out.println(s"Query: $name$nameSuffix. Comet Exec: Disabled")
         }
+        out.println(
+          s"Query: $name$nameSuffix: ExplainInfo:\n" +
+            s"${new 
ExtendedExplainInfo().generateExtendedInfo(executedPlan)}\n")
       }
     }
   }
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 85e5882..ef64d66 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -26,6 +26,7 @@ import scala.util.Try
 
 import org.scalatest.BeforeAndAfterEach
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 import org.apache.parquet.column.ParquetProperties
 import org.apache.parquet.example.data.Group
@@ -37,7 +38,7 @@ import org.apache.spark._
 import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, 
MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER}
 import org.apache.spark.sql.comet.{CometBatchScanExec, 
CometBroadcastExchangeExec, CometExec, CometRowToColumnarExec, CometScanExec, 
CometScanWrapper, CometSinkPlaceHolder}
 import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, 
CometNativeShuffle, CometShuffleExchangeExec}
-import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, 
SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{ColumnarToRowExec, ExtendedMode, 
InputAdapter, SparkPlan, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.internal._
 import org.apache.spark.sql.test._
@@ -46,6 +47,8 @@ import org.apache.spark.sql.types.StructType
 
 import org.apache.comet._
 import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
+import org.apache.comet.shims.ShimCometSparkSessionExtensions
+import 
org.apache.comet.shims.ShimCometSparkSessionExtensions.supportsExtendedExplainInfo
 
 /**
  * Base class for testing. This exists in `org.apache.spark.sql` since 
[[SQLTestUtils]] is
@@ -55,7 +58,8 @@ abstract class CometTestBase
     extends QueryTest
     with SQLTestUtils
     with BeforeAndAfterEach
-    with AdaptiveSparkPlanHelper {
+    with AdaptiveSparkPlanHelper
+    with ShimCometSparkSessionExtensions {
   import testImplicits._
 
   protected val shuffleManager: String =
@@ -227,6 +231,30 @@ abstract class CometTestBase
     (expected.get.getCause, actual.getCause)
   }
 
+  protected def checkSparkAnswerAndCompareExplainPlan(
+      df: DataFrame,
+      expectedInfo: String): Unit = {
+    var expected: Array[Row] = Array.empty
+    var dfSpark: Dataset[Row] = null
+    withSQLConf(
+      CometConf.COMET_ENABLED.key -> "false",
+      "spark.sql.extendedExplainProvider" -> "") {
+      dfSpark = Dataset.ofRows(spark, df.logicalPlan)
+      expected = dfSpark.collect()
+    }
+    val dfComet = Dataset.ofRows(spark, df.logicalPlan)
+    checkAnswer(dfComet, expected)
+    val diff = StringUtils.difference(
+      dfSpark.queryExecution.explainString(ExtendedMode),
+      dfComet.queryExecution.explainString(ExtendedMode))
+    if (supportsExtendedExplainInfo(dfSpark.queryExecution)) {
+      assert(diff.contains(expectedInfo))
+    }
+    val extendedInfo =
+      new 
ExtendedExplainInfo().generateExtendedInfo(dfComet.queryExecution.executedPlan)
+    assert(extendedInfo.equalsIgnoreCase(expectedInfo))
+  }
+
   private var _spark: SparkSession = _
   protected implicit def spark: SparkSession = _spark
   protected implicit def sqlContext: SQLContext = _spark.sqlContext


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to