This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b66a78  fix: Use `makeCopy` to change relation in 
`FileSourceScanExec` (#207)
4b66a78 is described below

commit 4b66a78047e74ccc253861b916a8ad49af6196c7
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu Mar 14 12:56:50 2024 -0700

    fix: Use `makeCopy` to change relation in `FileSourceScanExec` (#207)
---
 .../org/apache/spark/sql/comet/CometScanExec.scala | 26 ++++++++++++++++++++--
 1 file changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index 4bf01f0..42cc96b 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.comet
 
 import scala.collection.mutable.HashMap
 import scala.concurrent.duration.NANOSECONDS
+import scala.reflect.ClassTag
 
 import org.apache.hadoop.fs.Path
 import org.apache.spark.rdd.RDD
@@ -439,8 +440,29 @@ case class CometScanExec(
 
 object CometScanExec {
   def apply(scanExec: FileSourceScanExec, session: SparkSession): 
CometScanExec = {
-    val wrapped = scanExec.copy(relation =
-      scanExec.relation.copy(fileFormat = new CometParquetFileFormat)(session))
+    // TreeNode.mapProductIterator is protected method.
+    def mapProductIterator[B: ClassTag](product: Product, f: Any => B): 
Array[B] = {
+      val arr = Array.ofDim[B](product.productArity)
+      var i = 0
+      while (i < arr.length) {
+        arr(i) = f(product.productElement(i))
+        i += 1
+      }
+      arr
+    }
+
+    // Replacing the relation in FileSourceScanExec by `copy` seems causing 
some issues
+    // on other Spark distributions if FileSourceScanExec constructor is 
changed.
+    // Using `makeCopy` to avoid the issue.
+    // https://github.com/apache/arrow-datafusion-comet/issues/190
+    def transform(arg: Any): AnyRef = arg match {
+      case _: HadoopFsRelation =>
+        scanExec.relation.copy(fileFormat = new 
CometParquetFileFormat)(session)
+      case other: AnyRef => other
+      case null => null
+    }
+    val newArgs = mapProductIterator(scanExec, transform(_))
+    val wrapped = scanExec.makeCopy(newArgs).asInstanceOf[FileSourceScanExec]
     val batchScanExec = CometScanExec(
       wrapped.relation,
       wrapped.output,

Reply via email to