This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d994b272b [MINOR] improvement(test): A better computation logic for 
WriteAndReadMetricsTest without using reflection (#1563)
d994b272b is described below

commit d994b272b5a8ee30155aac6d529857f044846c57
Author: RickyMa <[email protected]>
AuthorDate: Thu Mar 7 15:24:06 2024 +0800

    [MINOR] improvement(test): A better computation logic for 
WriteAndReadMetricsTest without using reflection (#1563)
    
    ### What changes were proposed in this pull request?
    
    Use a better computation logic for WriteAndReadMetricsTest without using 
reflection.
    
    ### Why are the changes needed?
    
    No need to use reflection, which will be quite confusing sometimes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 .../uniffle/test/WriteAndReadMetricsTest.java      | 67 +++++++++++-----------
 1 file changed, 34 insertions(+), 33 deletions(-)

diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
index c7b014d43..ec03ddbc9 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/WriteAndReadMetricsTest.java
@@ -17,20 +17,17 @@
 
 package org.apache.uniffle.test;
 
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import scala.collection.Seq;
-
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.functions;
-import org.apache.spark.status.AppStatusStore;
-import org.apache.spark.status.api.v1.StageData;
 import org.junit.jupiter.api.Test;
 
 public class WriteAndReadMetricsTest extends SimpleTestBase {
@@ -42,6 +39,10 @@ public class WriteAndReadMetricsTest extends SimpleTestBase {
 
   @Override
   public Map<String, Long> runTest(SparkSession spark, String fileName) throws 
Exception {
+    // Instantiate WriteAndReadMetricsSparkListener and add it to SparkContext
+    WriteAndReadMetricsSparkListener listener = new 
WriteAndReadMetricsSparkListener();
+    spark.sparkContext().addSparkListener(listener);
+
     // take a rest to make sure shuffle server is registered
     Thread.sleep(3000);
 
@@ -63,8 +64,8 @@ public class WriteAndReadMetricsTest extends SimpleTestBase {
     // take a rest to make sure all task metrics are updated before read 
stageData
     Thread.sleep(100);
     for (int stageId : 
spark.sparkContext().statusTracker().getJobInfo(0).get().stageIds()) {
-      long writeRecords = getFirstStageData(spark, 
stageId).shuffleWriteRecords();
-      long readRecords = getFirstStageData(spark, 
stageId).shuffleReadRecords();
+      long writeRecords = listener.getWriteRecords(stageId);
+      long readRecords = listener.getReadRecords(stageId);
       result.put(stageId + "-write-records", writeRecords);
       result.put(stageId + "-read-records", readRecords);
     }
@@ -72,31 +73,31 @@ public class WriteAndReadMetricsTest extends SimpleTestBase 
{
     return result;
   }
 
-  private StageData getFirstStageData(SparkSession spark, int stageId)
-      throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
-    AppStatusStore statestore = spark.sparkContext().statusStore();
-    try {
-      return ((Seq<StageData>)
-              statestore
-                  .getClass()
-                  .getDeclaredMethod("stageData", int.class, boolean.class)
-                  .invoke(statestore, stageId, false))
-          .toList()
-          .head();
-    } catch (Exception e) {
-      return ((Seq<StageData>)
-              statestore
-                  .getClass()
-                  .getDeclaredMethod(
-                      "stageData",
-                      int.class,
-                      boolean.class,
-                      List.class,
-                      boolean.class,
-                      double[].class)
-                  .invoke(statestore, stageId, false, new ArrayList<>(), true, 
new double[] {}))
-          .toList()
-          .head();
+  private static class WriteAndReadMetricsSparkListener extends SparkListener {
+    private HashMap<Integer, Long> stageIdToWriteRecords = new HashMap<>();
+    private HashMap<Integer, Long> stageIdToReadRecords = new HashMap<>();
+
+    @Override
+    public void onTaskEnd(SparkListenerTaskEnd event) {
+      int stageId = event.stageId();
+      TaskMetrics taskMetrics = event.taskMetrics();
+      if (taskMetrics != null) {
+        long writeRecords = taskMetrics.shuffleWriteMetrics().recordsWritten();
+        long readRecords = taskMetrics.shuffleReadMetrics().recordsRead();
+        // Accumulate writeRecords and readRecords for the given stageId
+        stageIdToWriteRecords.put(
+            stageId, stageIdToWriteRecords.getOrDefault(stageId, 0L) + 
writeRecords);
+        stageIdToReadRecords.put(
+            stageId, stageIdToReadRecords.getOrDefault(stageId, 0L) + 
readRecords);
+      }
+    }
+
+    public long getWriteRecords(int stageId) {
+      return stageIdToWriteRecords.getOrDefault(stageId, 0L);
+    }
+
+    public long getReadRecords(int stageId) {
+      return stageIdToReadRecords.getOrDefault(stageId, 0L);
     }
   }
 }

Reply via email to