This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cc00d5e [MINOR]Fix(tez) add output mapOutputRecordCounter metrics 
(#1093)
8cc00d5e is described below

commit 8cc00d5e396c5f33b7121d1fd715fdd3ca179de0
Author: bin41215 <[email protected]>
AuthorDate: Mon Aug 7 18:38:02 2023 +0800

    [MINOR]Fix(tez) add output mapOutputRecordCounter metrics (#1093)
    
    ### What changes were proposed in this pull request?
    
    Add output mapOutputRecordCounter metrics
    
    ### Why are the changes needed?
    
    Add this metrics to fix reducing the number of tasks
    
    Fix: #972
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    ut
---
 .../common/sort/buffer/WriteBufferManager.java     |  6 +++++-
 .../library/common/sort/impl/RssSorter.java        |  3 ++-
 .../library/common/sort/impl/RssUnSorter.java      |  3 ++-
 .../common/sort/buffer/WriteBufferManagerTest.java | 24 ++++++++++++++++++----
 4 files changed, 29 insertions(+), 7 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index 7da3208c..b88bedda 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -97,6 +97,7 @@ public class WriteBufferManager<K, V> {
   private final int shuffleId;
   private final boolean isNeedSorted;
   private final TezCounter mapOutputByteCounter;
+  private final TezCounter mapOutputRecordCounter;
 
   /** WriteBufferManager */
   public WriteBufferManager(
@@ -125,7 +126,8 @@ public class WriteBufferManager<K, V> {
       int bitmapSplitNum,
       int shuffleId,
       boolean isNeedSorted,
-      TezCounter mapOutputByteCounter) {
+      TezCounter mapOutputByteCounter,
+      TezCounter mapOutputRecordCounter) {
     this.tezTaskAttemptID = tezTaskAttemptID;
     this.maxMemSize = maxMemSize;
     this.appId = appId;
@@ -152,6 +154,7 @@ public class WriteBufferManager<K, V> {
     this.shuffleId = shuffleId;
     this.isNeedSorted = isNeedSorted;
     this.mapOutputByteCounter = mapOutputByteCounter;
+    this.mapOutputRecordCounter = mapOutputRecordCounter;
     this.sendExecutorService =
         Executors.newFixedThreadPool(sendThreadNum, 
ThreadUtils.getThreadFactory("send-thread"));
   }
@@ -200,6 +203,7 @@ public class WriteBufferManager<K, V> {
         && inSendListBytes.get() <= maxMemSize * sendThreshold) {
       sendBuffersToServers();
     }
+    mapOutputRecordCounter.increment(1);
     mapOutputByteCounter.increment(length);
   }
 
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
index c937770e..94c9aa90 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
@@ -166,7 +166,8 @@ public class RssSorter extends ExternalSorter {
             bitmapSplitNum,
             shuffleId,
             true,
-            mapOutputByteCounter);
+            mapOutputByteCounter,
+            mapOutputRecordCounter);
     LOG.info("Initialized WriteBufferManager.");
   }
 
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
index b2073fe9..0248bb8a 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
@@ -164,7 +164,8 @@ public class RssUnSorter extends ExternalSorter {
             bitmapSplitNum,
             shuffleId,
             false,
-            mapOutputByteCounter);
+            mapOutputByteCounter,
+            mapOutputRecordCounter);
     LOG.info("Initialized WriteBufferManager.");
   }
 
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index 73c625a9..d9036d68 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -119,6 +119,8 @@ public class WriteBufferManagerTest {
     OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, 
workingDir);
     TezCounter mapOutputByteCounter =
         outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+    TezCounter mapOutputRecordCounter =
+        outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
 
     WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
         new WriteBufferManager(
@@ -147,7 +149,8 @@ public class WriteBufferManagerTest {
             bitmapSplitNum,
             shuffleId,
             true,
-            mapOutputByteCounter);
+            mapOutputByteCounter,
+            mapOutputRecordCounter);
 
     Random random = new Random();
     for (int i = 0; i < 1000; i++) {
@@ -158,6 +161,7 @@ public class WriteBufferManagerTest {
       bufferManager.addRecord(1, new BytesWritable(key), new 
BytesWritable(value));
     }
 
+    assertEquals(1000, mapOutputRecordCounter.getValue());
     assertEquals(1052000, mapOutputByteCounter.getValue());
 
     boolean isException = false;
@@ -219,6 +223,8 @@ public class WriteBufferManagerTest {
     OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, 
workingDir);
     TezCounter mapOutputByteCounter =
         outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+    TezCounter mapOutputRecordCounter =
+        outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
 
     WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
         new WriteBufferManager(
@@ -247,7 +253,8 @@ public class WriteBufferManagerTest {
             bitmapSplitNum,
             shuffleId,
             true,
-            mapOutputByteCounter);
+            mapOutputByteCounter,
+            mapOutputRecordCounter);
 
     Random random = new Random();
     for (int i = 0; i < 1000; i++) {
@@ -259,6 +266,7 @@ public class WriteBufferManagerTest {
       bufferManager.addRecord(partitionId, new BytesWritable(key), new 
BytesWritable(value));
     }
 
+    assertEquals(1000, mapOutputRecordCounter.getValue());
     assertEquals(1052000, mapOutputByteCounter.getValue());
     bufferManager.waitSendFinished();
     assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
@@ -329,6 +337,8 @@ public class WriteBufferManagerTest {
     OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, 
workingDir);
     TezCounter mapOutputByteCounter =
         outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+    TezCounter mapOutputRecordCounter =
+        outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
 
     WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
         new WriteBufferManager(
@@ -357,7 +367,8 @@ public class WriteBufferManagerTest {
             bitmapSplitNum,
             shuffleId,
             true,
-            mapOutputByteCounter);
+            mapOutputByteCounter,
+            mapOutputRecordCounter);
 
     Random random = new Random();
     for (int i = 0; i < 10000; i++) {
@@ -370,6 +381,7 @@ public class WriteBufferManagerTest {
     }
     bufferManager.waitSendFinished();
 
+    assertEquals(10000, mapOutputRecordCounter.getValue());
     assertEquals(10520000, mapOutputByteCounter.getValue());
     assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
     assertEquals(
@@ -428,6 +440,8 @@ public class WriteBufferManagerTest {
     OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, 
workingDir);
     TezCounter mapOutputByteCounter =
         outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+    TezCounter mapOutputRecordCounter =
+        outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
 
     WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
         new WriteBufferManager(
@@ -456,7 +470,8 @@ public class WriteBufferManagerTest {
             bitmapSplitNum,
             shuffleId,
             true,
-            mapOutputByteCounter);
+            mapOutputByteCounter,
+            mapOutputRecordCounter);
 
     Random random = new Random();
     RssException rssException =
@@ -478,6 +493,7 @@ public class WriteBufferManagerTest {
     rssException = assertThrows(RssException.class, 
bufferManager::waitSendFinished);
     assertTrue(rssException.getMessage().contains("Send failed"));
 
+    assertTrue(mapOutputRecordCounter.getValue() < 10000);
     assertTrue(mapOutputByteCounter.getValue() < 10520000);
   }
 

Reply via email to