ivoson commented on code in PR #7167:
URL: https://github.com/apache/incubator-gluten/pull/7167#discussion_r1753751955


##########
gluten-data/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala:
##########
@@ -15,22 +15,29 @@
  * limitations under the License.
  */
 package org.apache.gluten.metrics
+import org.apache.spark.TaskContext
 import org.apache.spark.sql.execution.metric.SQLMetric
 
-case class InputIteratorMetricsUpdater(metrics: Map[String, SQLMetric]) 
extends MetricsUpdater {
+case class InputIteratorMetricsUpdater(
+    metrics: Map[String, SQLMetric],
+    forBroadcast: Boolean = false)
+  extends MetricsUpdater {
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
       metrics("cpuCount") += operatorMetrics.cpuCount
       metrics("wallNanos") += operatorMetrics.wallNanos
-      if (operatorMetrics.outputRows == 0 && operatorMetrics.outputVectors == 
0) {
-        // Sometimes, velox does not update metrics for intermediate operator,
-        // here we try to use the input metrics
-        metrics("numOutputRows") += operatorMetrics.inputRows
-        metrics("outputVectors") += operatorMetrics.inputVectors
-      } else {
-        metrics("numOutputRows") += operatorMetrics.outputRows
-        metrics("outputVectors") += operatorMetrics.outputVectors
+      // For broadcast exchange, we only collect the metrics once.
+      if (!forBroadcast || TaskContext.getPartitionId() == 0) {
+        if (operatorMetrics.outputRows == 0 && operatorMetrics.outputVectors 
== 0) {
+          // Sometimes, velox does not update metrics for intermediate 
operator,
+          // here we try to use the input metrics
+          metrics("numOutputRows") += operatorMetrics.inputRows
+          metrics("outputVectors") += operatorMetrics.inputVectors
+        } else {
+          metrics("numOutputRows") += operatorMetrics.outputRows
+          metrics("outputVectors") += operatorMetrics.outputVectors
+        }

Review Comment:
   Yeah...I agree that this is tricky but a simple workaround within current 
metrics framework.
   
   Adding some hints in the UI page may help some viewers to understand the 
duplication of data.
   
   While another scenario is that some folks might be collecting the metrics to 
analyze the sql workload they are running automatically. They may take the join 
tables' output row count as the input table size for join etc (which works for 
vanilla spark), but with gluten will get wrong information. That's the 
motivation and I am trying to get a fix here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to