felipepessoto commented on PR #7111:
URL:
https://github.com/apache/incubator-gluten/pull/7111#issuecomment-2723466481
@dcoliversun, I think I found a bug with this change and
FilterMetricsUpdater. The DeleteCommand query is:
data.filter(new Column(cond))
.select(input_file_name())
.filter(new Column(incrDeletedCountExpr))
.distinct()
.as[String]
.collect()
If you remove the `.select(input_file_name())`, and replace it by something
like `.select(col("_metadata.file_path"))`, or
`.select(org.apache.spark.sql.functions.lit("TEST"))` the
FilterMetricsUpdater.updateNativeMetrics is not called with correct
`operatorMetrics.outputRows`.
I guess some shortcut is taken when we don't have a UDF. But this shouldn't
happen, since IncrementMetric is Nondeterministic. I'm not sure if this is the
case though, Idk how to troubleshoot this.
I added some println:
```
class FilterMetricsUpdater(
val metrics: Map[String, SQLMetric],
val extraMetrics: Seq[(String, SQLMetric)])
extends MetricsUpdater {
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
println("FilterMetricsUpdater: 1")
if (opMetrics != null) {
println("FilterMetricsUpdater: 2")
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
metrics("outputBytes") += operatorMetrics.outputBytes
metrics("cpuCount") += operatorMetrics.cpuCount
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
println(s"FilterMetricsUpdater: 3. ${extraMetrics.size}.
$extraMetrics")
extraMetrics.foreach {
case (name, metric) =>
name match {
case "increment_metric" =>
println(s"FilterMetricsUpdater: 4.
${operatorMetrics.outputRows}.")
metric += operatorMetrics.outputRows
case _ => // do nothing
}
}
}
}
}
```
This is what it shows for
`.select(org.apache.spark.sql.functions.lit("TEST"))` or
`.select(col("_metadata.file_path"))` and run `build/sbt 'spark/testOnly
org.apache.spark.sql.delta.DescribeDeltaHistorySuite -- -t "operation metrics -
delete"'`
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name:
Some(number of rows deleted.), value: 0)))
FilterMetricsUpdater: 4. 0.
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 0. List()
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name:
Some(number of rows deleted.), value: 0)))
FilterMetricsUpdater: 4. 0.
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 0. List()
This is when we use the input_file_name:
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name:
Some(number of rows deleted.), value: 0)))
FilterMetricsUpdater: 4. 0.
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 0. List()
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name:
Some(number of rows deleted.), value: 0)))
FilterMetricsUpdater: 4. 1.
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 0. List()
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name:
Some(number of rows deleted.), value: 0)))
FilterMetricsUpdater: 4. 0.
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 0. List()
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2134, name:
Some(number of rows deleted.), value: 0)))
FilterMetricsUpdater: 4. 0.
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 0. List()
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 0. List()
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 1. List((increment_metric,SQLMetric(id: 2150, name:
Some(number of rows touched), value: 0)))
FilterMetricsUpdater: 4. 3.
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 0. List()
FilterMetricsUpdater: 1
FilterMetricsUpdater: 2
FilterMetricsUpdater: 3. 0. List()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]