This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 769d76e74 fix: NativeScan count assert firing for no reason (#2850)
769d76e74 is described below
commit 769d76e74e618c0555a993322318bd32e69716bf
Author: Emily Matheys <[email protected]>
AuthorDate: Fri Dec 5 16:43:07 2025 +0200
fix: NativeScan count assert firing for no reason (#2850)
---
.../org/apache/spark/sql/comet/operators.scala | 41 +++++++++++++---------
.../spark/sql/comet/ParquetEncryptionITCase.scala | 16 ++++++---
2 files changed, 35 insertions(+), 22 deletions(-)
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 6eebc53d5..28688e904 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -217,22 +217,14 @@ abstract class CometNativeExec extends CometExec {
// TODO: support native metrics for all operators.
val nativeMetrics = CometMetricNode.fromCometPlan(this)
+ // Go over all the native scans, in order to see if they need
encryption options.
// For each relation in a CometNativeScan generate a hadoopConf,
// for each file path in a relation associate with hadoopConf
- val cometNativeScans: Seq[CometNativeScanExec] = this
- .collectLeaves()
- .filter(_.isInstanceOf[CometNativeScanExec])
- .map(_.asInstanceOf[CometNativeScanExec])
- assert(
- cometNativeScans.size <= 1,
- "We expect one native scan in a Comet plan since we will broadcast
one hadoopConf.")
- // If this assumption changes in the future, you can look at the
commit history of #2447
- // to see how there used to be a map of relations to broadcasted confs
in case multiple
- // relations in a single plan. The example that came up was UNION. See
discussion at:
- //
https://github.com/apache/datafusion-comet/pull/2447#discussion_r2406118264
- val (broadcastedHadoopConfForEncryption, encryptedFilePaths) =
- cometNativeScans.headOption.fold(
- (None: Option[Broadcast[SerializableConfiguration]],
Seq.empty[String])) { scan =>
+ // This is done per native plan, so only count scans until a comet
input is reached.
+ val encryptionOptions =
+ mutable.ArrayBuffer.empty[(Broadcast[SerializableConfiguration],
Seq[String])]
+ foreachUntilCometInput(this) {
+ case scan: CometNativeScanExec =>
// This creates a hadoopConf that brings in any SQLConf
"spark.hadoop.*" configs and
// per-relation configs since different tables might have
different decryption
// properties.
@@ -244,10 +236,25 @@ abstract class CometNativeExec extends CometExec {
val broadcastedConf =
scan.relation.sparkSession.sparkContext
.broadcast(new SerializableConfiguration(hadoopConf))
- (Some(broadcastedConf), scan.relation.inputFiles.toSeq)
- } else {
- (None, Seq.empty)
+
+ val optsTuple: (Broadcast[SerializableConfiguration],
Seq[String]) =
+ (broadcastedConf, scan.relation.inputFiles.toSeq)
+ encryptionOptions += optsTuple
}
+ case _ => // no-op
+ }
+ assert(
+ encryptionOptions.size <= 1,
+ "We expect one native scan that requires encryption reading in a
Comet plan," +
+ " since we will broadcast one hadoopConf.")
+ // If this assumption changes in the future, you can look at the
commit history of #2447
+ // to see how there used to be a map of relations to broadcasted confs
in case multiple
+ // relations in a single plan. The example that came up was UNION. See
discussion at:
+ //
https://github.com/apache/datafusion-comet/pull/2447#discussion_r2406118264
+ val (broadcastedHadoopConfForEncryption, encryptedFilePaths) =
+ encryptionOptions.headOption match {
+ case Some((conf, paths)) => (Some(conf), paths)
+ case None => (None, Seq.empty)
}
def createCometExecIter(
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala
index cff21ecec..b3e6a5a42 100644
---
a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala
@@ -32,7 +32,7 @@ import org.apache.parquet.crypto.DecryptionPropertiesFactory
import org.apache.parquet.crypto.keytools.{KeyToolkit,
PropertiesDrivenCryptoFactory}
import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS
import org.apache.spark.{DebugFilesystem, SparkConf}
-import org.apache.spark.sql.{CometTestBase, SQLContext}
+import org.apache.spark.sql.{functions, CometTestBase, SQLContext}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -359,7 +359,8 @@ class ParquetEncryptionITCase extends CometTestBase with
SQLTestUtils {
KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
- s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
+ s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}",
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
// Write first file with key1
val inputDF1 = spark
@@ -394,11 +395,16 @@ class ParquetEncryptionITCase extends CometTestBase with
SQLTestUtils {
val parquetDF2 = spark.read.parquet(parquetDir2)
val unionDF = parquetDF1.union(parquetDF2)
+ // Since the union has its own executeColumnar, problems would not
surface if it is the last operator
+ // If we add another comet aggregate after the union, we see the need
for the
+ // foreachUntilCometInput() in operator.scala
+ // as we would error on multiple native scan execs despite no longer
being in the same plan at all
+ val aggDf = unionDF.agg(functions.sum("id"))
if (CometConf.COMET_ENABLED.get(conf)) {
- checkSparkAnswerAndOperator(unionDF)
+ checkSparkAnswerAndOperator(aggDf)
} else {
- checkSparkAnswer(unionDF)
+ checkSparkAnswer(aggDf)
}
}
}
@@ -447,7 +453,7 @@ class ParquetEncryptionITCase extends CometTestBase with
SQLTestUtils {
}
protected override def sparkConf: SparkConf = {
- val conf = new SparkConf()
+ val conf = super.sparkConf
conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
conf
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]