This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new eef5f28a0 chore: check `missingInput` for Comet plan nodes (#2795)
eef5f28a0 is described below
commit eef5f28a0727d9aef043fa2b87d6747ff68b827a
Author: Oleks V <[email protected]>
AuthorDate: Tue Nov 18 17:05:18 2025 -0800
chore: check `missingInput` for Comet plan nodes (#2795)
* chore: check `missingInput` for Comet plan nodes
---------
Co-authored-by: Martin Grigorov <[email protected]>
---
.../sql/comet/CometTakeOrderedAndProjectExec.scala | 7 ++++++-
.../q79.native_iceberg_compat/simplified.txt | 2 +-
.../q79/simplified.txt | 2 +-
.../q84.native_iceberg_compat/simplified.txt | 2 +-
.../q84/simplified.txt | 2 +-
.../q84.native_iceberg_compat/simplified.txt | 2 +-
.../q84/simplified.txt | 2 +-
.../q79.native_iceberg_compat/simplified.txt | 2 +-
.../approved-plans-v1_4/q79/simplified.txt | 2 +-
.../q84.native_iceberg_compat/simplified.txt | 2 +-
.../approved-plans-v1_4/q84/simplified.txt | 2 +-
.../q6.native_iceberg_compat/simplified.txt | 2 +-
.../approved-plans-v2_7-spark3_5/q6/simplified.txt | 2 +-
.../q6.native_iceberg_compat/simplified.txt | 2 +-
.../approved-plans-v2_7/q6/simplified.txt | 2 +-
.../scala/org/apache/spark/sql/CometTestBase.scala | 24 ++++++++++++++++++++++
16 files changed, 44 insertions(+), 15 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
index 65ae84d21..2517c19f2 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
@@ -22,7 +22,7 @@ package org.apache.spark.sql.comet
import org.apache.spark.TaskContext
import org.apache.spark.rdd.{ParallelCollectionRDD, RDD}
import org.apache.spark.serializer.Serializer
-import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression,
SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD,
CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{SparkPlan, TakeOrderedAndProjectExec,
UnaryExecNode, UnsafeRowSerializer}
@@ -98,10 +98,15 @@ case class CometTakeOrderedAndProjectExec(
child: SparkPlan)
extends CometExec
with UnaryExecNode {
+
+ override def producedAttributes: AttributeSet = outputSet ++
AttributeSet(projectList)
+
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
private lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"numPartitions" -> SQLMetrics.createMetric(
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79.native_iceberg_compat/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79.native_iceberg_compat/simplified.txt
index 8e244d6c9..5b252a906 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79.native_iceberg_compat/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1,
30),ss_ticket_number,amt,profit,s_city]
+ CometTakeOrderedAndProject [s_city]
[c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
CometProject [c_last_name,c_first_name,substr(s_city, 1,
30),ss_ticket_number,amt,profit,s_city]
CometBroadcastHashJoin
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
CometHashAggregate [ss_addr_sk,sum,sum]
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79/simplified.txt
index 8e244d6c9..5b252a906 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1,
30),ss_ticket_number,amt,profit,s_city]
+ CometTakeOrderedAndProject [s_city]
[c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
CometProject [c_last_name,c_first_name,substr(s_city, 1,
30),ss_ticket_number,amt,profit,s_city]
CometBroadcastHashJoin
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
CometHashAggregate [ss_addr_sk,sum,sum]
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84.native_iceberg_compat/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84.native_iceberg_compat/simplified.txt
index fd8d1864e..e43557c27 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84.native_iceberg_compat/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+ CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
CometProject [c_last_name,c_first_name]
[customer_id,customername,c_customer_id]
CometBroadcastHashJoin
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
CometBroadcastExchange
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84/simplified.txt
index fd8d1864e..e43557c27 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+ CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
CometProject [c_last_name,c_first_name]
[customer_id,customername,c_customer_id]
CometBroadcastHashJoin
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
CometBroadcastExchange
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84.native_iceberg_compat/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84.native_iceberg_compat/simplified.txt
index fd8d1864e..e43557c27 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84.native_iceberg_compat/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+ CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
CometProject [c_last_name,c_first_name]
[customer_id,customername,c_customer_id]
CometBroadcastHashJoin
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
CometBroadcastExchange
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84/simplified.txt
index fd8d1864e..e43557c27 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+ CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
CometProject [c_last_name,c_first_name]
[customer_id,customername,c_customer_id]
CometBroadcastHashJoin
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
CometBroadcastExchange
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79.native_iceberg_compat/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79.native_iceberg_compat/simplified.txt
index 8e244d6c9..5b252a906 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79.native_iceberg_compat/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1,
30),ss_ticket_number,amt,profit,s_city]
+ CometTakeOrderedAndProject [s_city]
[c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
CometProject [c_last_name,c_first_name,substr(s_city, 1,
30),ss_ticket_number,amt,profit,s_city]
CometBroadcastHashJoin
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
CometHashAggregate [ss_addr_sk,sum,sum]
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79/simplified.txt
index 8e244d6c9..5b252a906 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1,
30),ss_ticket_number,amt,profit,s_city]
+ CometTakeOrderedAndProject [s_city]
[c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
CometProject [c_last_name,c_first_name,substr(s_city, 1,
30),ss_ticket_number,amt,profit,s_city]
CometBroadcastHashJoin
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
CometHashAggregate [ss_addr_sk,sum,sum]
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84.native_iceberg_compat/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84.native_iceberg_compat/simplified.txt
index fd8d1864e..e43557c27 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84.native_iceberg_compat/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+ CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
CometProject [c_last_name,c_first_name]
[customer_id,customername,c_customer_id]
CometBroadcastHashJoin
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
CometBroadcastExchange
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84/simplified.txt
index fd8d1864e..e43557c27 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+ CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
CometProject [c_last_name,c_first_name]
[customer_id,customername,c_customer_id]
CometBroadcastHashJoin
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
CometBroadcastExchange
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_iceberg_compat/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_iceberg_compat/simplified.txt
index cbaf71ab0..2978e30c1 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_iceberg_compat/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [state,cnt,ca_state]
+ CometTakeOrderedAndProject [ca_state] [state,cnt]
CometFilter [state,cnt,ca_state]
CometHashAggregate [count] [state,cnt,ca_state,count(1)]
CometExchange [ca_state] #1
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6/simplified.txt
index cbaf71ab0..2978e30c1 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [state,cnt,ca_state]
+ CometTakeOrderedAndProject [ca_state] [state,cnt]
CometFilter [state,cnt,ca_state]
CometHashAggregate [count] [state,cnt,ca_state,count(1)]
CometExchange [ca_state] #1
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_iceberg_compat/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_iceberg_compat/simplified.txt
index cbaf71ab0..2978e30c1 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_iceberg_compat/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [state,cnt,ca_state]
+ CometTakeOrderedAndProject [ca_state] [state,cnt]
CometFilter [state,cnt,ca_state]
CometHashAggregate [count] [state,cnt,ca_state,count(1)]
CometExchange [ca_state] #1
diff --git
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
index cbaf71ab0..2978e30c1 100644
---
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
+++
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
@@ -1,7 +1,7 @@
WholeStageCodegen (1)
CometColumnarToRow
InputAdapter
- CometTakeOrderedAndProject [state,cnt,ca_state]
+ CometTakeOrderedAndProject [ca_state] [state,cnt]
CometFilter [state,cnt,ca_state]
CometHashAggregate [count] [state,cnt,ca_state,count(1)]
CometExchange [ca_state] #1
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 591f75acb..3f7ce5b4f 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -385,6 +385,8 @@ abstract class CometTestBase
s"plan: ${new ExtendedExplainInfo().generateExtendedInfo(plan)}")
case _ =>
}
+
+ checkPlanNotMissingInput(plan)
}
protected def findFirstNonCometOperator(
@@ -406,6 +408,28 @@ abstract class CometTestBase
None
}
+ // checks the plan node has no missing inputs
+ // such nodes represented in plan with exclamation mark !
+ // example: !CometWindowExec
+ protected def checkPlanNotMissingInput(plan: SparkPlan): Unit = {
+ def hasMissingInput(node: SparkPlan): Boolean = {
+ node.missingInput.nonEmpty && node.children.nonEmpty
+ }
+
+ val isCometNode = plan.nodeName.startsWith("Comet")
+
+ if (isCometNode && hasMissingInput(plan)) {
+ assert(
+ false,
+ s"Plan node `${plan.nodeName}` has invalid missingInput:
${plan.missingInput}")
+ }
+
+ // Otherwise recursively check children
+ plan.children.foreach { child =>
+ checkPlanNotMissingInput(child)
+ }
+ }
+
private def checkPlanContains(plan: SparkPlan, includePlans: Class[_]*):
Unit = {
includePlans.foreach { case planClass =>
if (plan.find(op => planClass.isAssignableFrom(op.getClass)).isEmpty) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]