li-boxuan commented on PR #9915:
URL: 
https://github.com/apache/incubator-gluten/pull/9915#issuecomment-2956510653

   > @li-boxuan Thank you for your work!
   > 
   > I found this in the code comment:
   > 
   > > The worst possible consequence for an over-strict equality check is 
acceptable (bigger search space)
   > 
   > This does seem possible. A bad case could be a rule that simply duplicates 
an input RDDScanNode makes the search endless. I think there is another safer 
approach to have an individual Spark rule to check for identical leaf scan 
nodes, then to renew the identical ones for distinction (i.e., copy the 
duplicated leaf nodes with nothing changed inside)? This rule could be placed 
after the RAS core EnumeratedTransform rule.
   > 
   > By all means to make sure each leaf scan node in the query DAG has only 
one single parent, am I understanding correctly here?
   
   I gave your suggestion a try;
   
   Core logic looks like this:
   
   ```
   object DistinguishIdenticalScans extends Rule[SparkPlan] {
   
     override def apply(plan: SparkPlan): SparkPlan = {
       // Set to track seen scan objects
       val seenScans = mutable.Set[SparkPlan]()
       
       // Custom traversal to avoid transformUp's fastEquals optimization
       def traverse(node: SparkPlan): (SparkPlan, Boolean) = {
         // First, check if this node is a scan and handle scan distinction 
logic
         val (nodeToProcess, scanChanged) = node match {
           case scan: FileSourceScanExec =>
             distinguishScan(scan, seenScans)
           case scan: BatchScanExec =>
             distinguishScan(scan, seenScans)
           case scan: DataSourceV2ScanExecBase =>
             distinguishScan(scan, seenScans)
           case scan: RDDScanExec =>
             distinguishScan(scan, seenScans)
           case other => (other, false)
         }
         
         // Then, recursively process children of the (possibly cloned) node
         var childrenChanged = false
         val newChildren = nodeToProcess.children.map { child =>
           val (newChild, changed) = traverse(child)
           if (changed) childrenChanged = true
           newChild
         }
         
         // Create final node with new children if any children changed
         val finalNode = if (childrenChanged) {
           // TROUBLE!!!! Spark internally checks equivalence and then skips 
this
           nodeToProcess.withNewChildren(newChildren)
         } else {
           nodeToProcess
         }
         
         (finalNode, scanChanged || childrenChanged)
       }
       
       val (newPlan, _) = traverse(plan)
       newPlan
     }
   
     private def distinguishScan[T <: SparkPlan](
         scan: T,
         seenScans: mutable.Set[SparkPlan]): (SparkPlan, Boolean) = {
       if (seenScans.contains(scan)) {
         // Scan already seen, clone it to distinguish from the original
         val newScan = cloneScan(scan)
         (newScan, true)
       } else {
         // First time seeing this scan
         seenScans.add(scan)
         (scan, false)
       }
     }
   
     private def cloneScan(scan: SparkPlan): SparkPlan = {
       scan.clone()
     }
   }
   ```
   
   Unfortunately Spark doesn't expose a public method to force replacing an 
arbitrary operator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to