zhztheplayer commented on code in PR #7167:
URL: https://github.com/apache/incubator-gluten/pull/7167#discussion_r1765128935


##########
gluten-data/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala:
##########
@@ -15,22 +15,29 @@
  * limitations under the License.
  */
 package org.apache.gluten.metrics
+import org.apache.spark.TaskContext
 import org.apache.spark.sql.execution.metric.SQLMetric
 
-case class InputIteratorMetricsUpdater(metrics: Map[String, SQLMetric]) 
extends MetricsUpdater {
+case class InputIteratorMetricsUpdater(
+    metrics: Map[String, SQLMetric],
+    forBroadcast: Boolean = false)
+  extends MetricsUpdater {
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
       metrics("cpuCount") += operatorMetrics.cpuCount
       metrics("wallNanos") += operatorMetrics.wallNanos
-      if (operatorMetrics.outputRows == 0 && operatorMetrics.outputVectors == 
0) {
-        // Sometimes, velox does not update metrics for intermediate operator,
-        // here we try to use the input metrics
-        metrics("numOutputRows") += operatorMetrics.inputRows
-        metrics("outputVectors") += operatorMetrics.inputVectors
-      } else {
-        metrics("numOutputRows") += operatorMetrics.outputRows
-        metrics("outputVectors") += operatorMetrics.outputVectors
+      // For broadcast exchange, we only collect the metrics once.
+      if (!forBroadcast || TaskContext.getPartitionId() == 0) {
+        if (operatorMetrics.outputRows == 0 && operatorMetrics.outputVectors 
== 0) {
+          // Sometimes, velox does not update metrics for intermediate 
operator,
+          // here we try to use the input metrics
+          metrics("numOutputRows") += operatorMetrics.inputRows
+          metrics("outputVectors") += operatorMetrics.inputVectors
+        } else {
+          metrics("numOutputRows") += operatorMetrics.outputRows
+          metrics("outputVectors") += operatorMetrics.outputVectors
+        }

Review Comment:
    What I was worried about is when something like 
`TaskContext.getPartitionId() == 0)` is coded, then there might be some corner 
cases that the broadcasted data are not identically consumed by different join 
partitions. Say if probe side of one partition is an empty input, backend could 
do some kind of optimization that prevents the build iterator from being read. 
Then wrong metrics will be displayed. @ivoson @ulysses-you If you guys could 
help confirm.
   
   If that's possible, we may still need a better solution to make sure we 
don't introduce new hidden issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to