This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 8cc00d5e [MINOR]Fix(tez) add output mapOutputRecordCounter metrics
(#1093)
8cc00d5e is described below
commit 8cc00d5e396c5f33b7121d1fd715fdd3ca179de0
Author: bin41215 <[email protected]>
AuthorDate: Mon Aug 7 18:38:02 2023 +0800
[MINOR]Fix(tez) add output mapOutputRecordCounter metrics (#1093)
### What changes were proposed in this pull request?
Add output mapOutputRecordCounter metrics
### Why are the changes needed?
Add this metrics to fix reducing the number of tasks
Fix: #972
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
ut
---
.../common/sort/buffer/WriteBufferManager.java | 6 +++++-
.../library/common/sort/impl/RssSorter.java | 3 ++-
.../library/common/sort/impl/RssUnSorter.java | 3 ++-
.../common/sort/buffer/WriteBufferManagerTest.java | 24 ++++++++++++++++++----
4 files changed, 29 insertions(+), 7 deletions(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index 7da3208c..b88bedda 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -97,6 +97,7 @@ public class WriteBufferManager<K, V> {
private final int shuffleId;
private final boolean isNeedSorted;
private final TezCounter mapOutputByteCounter;
+ private final TezCounter mapOutputRecordCounter;
/** WriteBufferManager */
public WriteBufferManager(
@@ -125,7 +126,8 @@ public class WriteBufferManager<K, V> {
int bitmapSplitNum,
int shuffleId,
boolean isNeedSorted,
- TezCounter mapOutputByteCounter) {
+ TezCounter mapOutputByteCounter,
+ TezCounter mapOutputRecordCounter) {
this.tezTaskAttemptID = tezTaskAttemptID;
this.maxMemSize = maxMemSize;
this.appId = appId;
@@ -152,6 +154,7 @@ public class WriteBufferManager<K, V> {
this.shuffleId = shuffleId;
this.isNeedSorted = isNeedSorted;
this.mapOutputByteCounter = mapOutputByteCounter;
+ this.mapOutputRecordCounter = mapOutputRecordCounter;
this.sendExecutorService =
Executors.newFixedThreadPool(sendThreadNum,
ThreadUtils.getThreadFactory("send-thread"));
}
@@ -200,6 +203,7 @@ public class WriteBufferManager<K, V> {
&& inSendListBytes.get() <= maxMemSize * sendThreshold) {
sendBuffersToServers();
}
+ mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(length);
}
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
index c937770e..94c9aa90 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
@@ -166,7 +166,8 @@ public class RssSorter extends ExternalSorter {
bitmapSplitNum,
shuffleId,
true,
- mapOutputByteCounter);
+ mapOutputByteCounter,
+ mapOutputRecordCounter);
LOG.info("Initialized WriteBufferManager.");
}
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
index b2073fe9..0248bb8a 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
@@ -164,7 +164,8 @@ public class RssUnSorter extends ExternalSorter {
bitmapSplitNum,
shuffleId,
false,
- mapOutputByteCounter);
+ mapOutputByteCounter,
+ mapOutputRecordCounter);
LOG.info("Initialized WriteBufferManager.");
}
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index 73c625a9..d9036d68 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -119,6 +119,8 @@ public class WriteBufferManagerTest {
OutputContext outputContext = OutputTestHelpers.createOutputContext(conf,
workingDir);
TezCounter mapOutputByteCounter =
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+ TezCounter mapOutputRecordCounter =
+ outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
new WriteBufferManager(
@@ -147,7 +149,8 @@ public class WriteBufferManagerTest {
bitmapSplitNum,
shuffleId,
true,
- mapOutputByteCounter);
+ mapOutputByteCounter,
+ mapOutputRecordCounter);
Random random = new Random();
for (int i = 0; i < 1000; i++) {
@@ -158,6 +161,7 @@ public class WriteBufferManagerTest {
bufferManager.addRecord(1, new BytesWritable(key), new
BytesWritable(value));
}
+ assertEquals(1000, mapOutputRecordCounter.getValue());
assertEquals(1052000, mapOutputByteCounter.getValue());
boolean isException = false;
@@ -219,6 +223,8 @@ public class WriteBufferManagerTest {
OutputContext outputContext = OutputTestHelpers.createOutputContext(conf,
workingDir);
TezCounter mapOutputByteCounter =
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+ TezCounter mapOutputRecordCounter =
+ outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
new WriteBufferManager(
@@ -247,7 +253,8 @@ public class WriteBufferManagerTest {
bitmapSplitNum,
shuffleId,
true,
- mapOutputByteCounter);
+ mapOutputByteCounter,
+ mapOutputRecordCounter);
Random random = new Random();
for (int i = 0; i < 1000; i++) {
@@ -259,6 +266,7 @@ public class WriteBufferManagerTest {
bufferManager.addRecord(partitionId, new BytesWritable(key), new
BytesWritable(value));
}
+ assertEquals(1000, mapOutputRecordCounter.getValue());
assertEquals(1052000, mapOutputByteCounter.getValue());
bufferManager.waitSendFinished();
assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
@@ -329,6 +337,8 @@ public class WriteBufferManagerTest {
OutputContext outputContext = OutputTestHelpers.createOutputContext(conf,
workingDir);
TezCounter mapOutputByteCounter =
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+ TezCounter mapOutputRecordCounter =
+ outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
new WriteBufferManager(
@@ -357,7 +367,8 @@ public class WriteBufferManagerTest {
bitmapSplitNum,
shuffleId,
true,
- mapOutputByteCounter);
+ mapOutputByteCounter,
+ mapOutputRecordCounter);
Random random = new Random();
for (int i = 0; i < 10000; i++) {
@@ -370,6 +381,7 @@ public class WriteBufferManagerTest {
}
bufferManager.waitSendFinished();
+ assertEquals(10000, mapOutputRecordCounter.getValue());
assertEquals(10520000, mapOutputByteCounter.getValue());
assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
assertEquals(
@@ -428,6 +440,8 @@ public class WriteBufferManagerTest {
OutputContext outputContext = OutputTestHelpers.createOutputContext(conf,
workingDir);
TezCounter mapOutputByteCounter =
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+ TezCounter mapOutputRecordCounter =
+ outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
new WriteBufferManager(
@@ -456,7 +470,8 @@ public class WriteBufferManagerTest {
bitmapSplitNum,
shuffleId,
true,
- mapOutputByteCounter);
+ mapOutputByteCounter,
+ mapOutputRecordCounter);
Random random = new Random();
RssException rssException =
@@ -478,6 +493,7 @@ public class WriteBufferManagerTest {
rssException = assertThrows(RssException.class,
bufferManager::waitSendFinished);
assertTrue(rssException.getMessage().contains("Send failed"));
+ assertTrue(mapOutputRecordCounter.getValue() < 10000);
assertTrue(mapOutputByteCounter.getValue() < 10520000);
}