This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 4b66a78 fix: Use `makeCopy` to change relation in
`FileSourceScanExec` (#207)
4b66a78 is described below
commit 4b66a78047e74ccc253861b916a8ad49af6196c7
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu Mar 14 12:56:50 2024 -0700
fix: Use `makeCopy` to change relation in `FileSourceScanExec` (#207)
---
.../org/apache/spark/sql/comet/CometScanExec.scala | 26 ++++++++++++++++++++--
1 file changed, 24 insertions(+), 2 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index 4bf01f0..42cc96b 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.comet
import scala.collection.mutable.HashMap
import scala.concurrent.duration.NANOSECONDS
+import scala.reflect.ClassTag
import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
@@ -439,8 +440,29 @@ case class CometScanExec(
object CometScanExec {
def apply(scanExec: FileSourceScanExec, session: SparkSession):
CometScanExec = {
- val wrapped = scanExec.copy(relation =
- scanExec.relation.copy(fileFormat = new CometParquetFileFormat)(session))
+ // TreeNode.mapProductIterator is protected method.
+ def mapProductIterator[B: ClassTag](product: Product, f: Any => B):
Array[B] = {
+ val arr = Array.ofDim[B](product.productArity)
+ var i = 0
+ while (i < arr.length) {
+ arr(i) = f(product.productElement(i))
+ i += 1
+ }
+ arr
+ }
+
+ // Replacing the relation in FileSourceScanExec by `copy` seems causing
some issues
+ // on other Spark distributions if FileSourceScanExec constructor is
changed.
+ // Using `makeCopy` to avoid the issue.
+ // https://github.com/apache/arrow-datafusion-comet/issues/190
+ def transform(arg: Any): AnyRef = arg match {
+ case _: HadoopFsRelation =>
+ scanExec.relation.copy(fileFormat = new
CometParquetFileFormat)(session)
+ case other: AnyRef => other
+ case null => null
+ }
+ val newArgs = mapProductIterator(scanExec, transform(_))
+ val wrapped = scanExec.makeCopy(newArgs).asInstanceOf[FileSourceScanExec]
val batchScanExec = CometScanExec(
wrapped.relation,
wrapped.output,