felipepessoto commented on PR #7111:
URL: 
https://github.com/apache/incubator-gluten/pull/7111#issuecomment-2723466481

   @dcoliversun, I think I found a bug with this change and 
FilterMetricsUpdater. The DeleteCommand query is:
   
                       data.filter(new Column(cond))
                         .select(input_file_name())
                         .filter(new Column(incrDeletedCountExpr))
                         .distinct()
                         .as[String]
                         .collect()
   
   If you remove the `.select(input_file_name())`, and replace it by something 
like `.select(col("_metadata.file_path"))`, or 
`.select(org.apache.spark.sql.functions.lit("TEST"))` the 
FilterMetricsUpdater.updateNativeMetrics is not called with correct 
`operatorMetrics.outputRows`.
   
   I guess some shortcut is taken when we don't have a UDF. But this shouldn't 
happen, since IncrementMetric is Nondeterministic. I'm not sure if this is the 
case though, Idk how to troubleshoot this.
   
   I added some println:
   
   ```
   class FilterMetricsUpdater(
       val metrics: Map[String, SQLMetric],
       val extraMetrics: Seq[(String, SQLMetric)])
     extends MetricsUpdater {
   
     override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
       println("FilterMetricsUpdater: 1")
       if (opMetrics != null) {
         println("FilterMetricsUpdater: 2")
         val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
         metrics("numOutputRows") += operatorMetrics.outputRows
         metrics("outputVectors") += operatorMetrics.outputVectors
         metrics("outputBytes") += operatorMetrics.outputBytes
         metrics("cpuCount") += operatorMetrics.cpuCount
         metrics("wallNanos") += operatorMetrics.wallNanos
         metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
         metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
         println(s"FilterMetricsUpdater: 3. ${extraMetrics.size}. 
$extraMetrics")
         extraMetrics.foreach {
           case (name, metric) =>
             name match {
               case "increment_metric" =>
                 println(s"FilterMetricsUpdater: 4. 
${operatorMetrics.outputRows}.")
                 metric += operatorMetrics.outputRows
               case _ => // do nothing
             }
         }
       }
     }
   }
   ```
   
   This is what it shows for 
`.select(org.apache.spark.sql.functions.lit("TEST"))` or 
`.select(col("_metadata.file_path"))` and run  `build/sbt 'spark/testOnly 
org.apache.spark.sql.delta.DescribeDeltaHistorySuite -- -t "operation metrics - 
delete"'`
   
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name: 
Some(number of rows deleted.), value: 0)))
   FilterMetricsUpdater: 4. 0.
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 0. List()
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name: 
Some(number of rows deleted.), value: 0)))
   FilterMetricsUpdater: 4. 0.
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 0. List()
   
   This is when we use the input_file_name:
   
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name: 
Some(number of rows deleted.), value: 0)))
   FilterMetricsUpdater: 4. 0.
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 0. List()
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name: 
Some(number of rows deleted.), value: 0)))
   FilterMetricsUpdater: 4. 1.
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 0. List()
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name: 
Some(number of rows deleted.), value: 0)))
   FilterMetricsUpdater: 4. 0.
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 0. List()
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name: 
Some(number of rows deleted.), value: 0)))
   FilterMetricsUpdater: 4. 0.
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 0. List()
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 0. List()
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2150, name: 
Some(number of rows touched), value: 0)))
   FilterMetricsUpdater: 4. 3.
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 0. List()
   FilterMetricsUpdater: 1
   FilterMetricsUpdater: 2
   FilterMetricsUpdater: 3. 0. List()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to