This is an automated email from the ASF dual-hosted git repository.
smallzhongfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 931d6cda0 [#1252] fix(server): Incorrect storage write fail metric
(#1253)
931d6cda0 is described below
commit 931d6cda0e3a691867721f51eabb567d568831e8
Author: summaryzb <[email protected]>
AuthorDate: Wed Oct 25 19:35:58 2023 +0800
[#1252] fix(server): Incorrect storage write fail metric (#1253)
What changes were proposed in this pull request?
As title
Why are the changes needed?
Fix: #1252
Does this PR introduce any user-facing change?
No.
How was this patch tested?
unit test
---
.../apache/uniffle/server/ShuffleFlushManager.java | 95 +++++++++++++---------
1 file changed, 57 insertions(+), 38 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 036ad69d0..eec574179 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -83,39 +83,41 @@ public class ShuffleFlushManager {
eventHandler.handle(event);
}
+ private void recordFinalFail(ShuffleDataFlushEvent event, long start) {
+ LOG.error(
+ "Failed to write data for {} in {} times, shuffle data will be lost",
event, retryMax);
+ if (event.getUnderStorage() != null) {
+
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
+ }
+ event.doCleanup();
+ if (shuffleServer != null) {
+ long duration = System.currentTimeMillis() - start;
+ ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
+ LOG.error(
+ "Flush to file for {} failed in {} ms and release {} bytes",
+ event,
+ duration,
+ event.getSize());
+ }
+ }
+
+ private void recordSuccess(ShuffleDataFlushEvent event, long start) {
+ updateCommittedBlockIds(event.getAppId(), event.getShuffleId(),
event.getShuffleBlocks());
+
ShuffleServerMetrics.incStorageSuccessCounter(event.getUnderStorage().getStorageHost());
+ event.doCleanup();
+ if (shuffleServer != null) {
+ if (LOG.isDebugEnabled()) {
+ long duration = System.currentTimeMillis() - start;
+ LOG.debug("Flush to file success in {} ms and release {} bytes",
duration, event.getSize());
+ }
+ }
+ }
+
public void processEvent(ShuffleDataFlushEvent event) {
try {
ShuffleServerMetrics.gaugeWriteHandler.inc();
- long start = System.currentTimeMillis();
- boolean writeSuccess = flushToFile(event);
- if (writeSuccess || event.getRetryTimes() > retryMax) {
- if (event.getRetryTimes() > retryMax) {
- LOG.error(
- "Failed to write data for {} in {} times, shuffle data will be
lost",
- event,
- retryMax);
- if (event.getUnderStorage() != null) {
-
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
- }
- }
- event.doCleanup();
- if (shuffleServer != null) {
- long duration = System.currentTimeMillis() - start;
- if (writeSuccess) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Flush to file success in {} ms and release {} bytes",
duration, event.getSize());
- }
- } else {
- ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
- LOG.error(
- "Flush to file for {} failed in {} ms and release {} bytes",
- event,
- duration,
- event.getSize());
- }
- }
- }
+ flushToFile(event);
+ // for thread safety we should not use or change any event info when
write to file is failed
} catch (Exception e) {
LOG.error("Exception happened when flush data for " + event, e);
} finally {
@@ -124,7 +126,12 @@ public class ShuffleFlushManager {
}
}
+ private boolean reachRetryMax(ShuffleDataFlushEvent event) {
+ return event.getRetryTimes() > retryMax;
+ }
+
private boolean flushToFile(ShuffleDataFlushEvent event) {
+ long start = System.currentTimeMillis();
boolean writeSuccess = false;
try {
@@ -160,7 +167,7 @@ public class ShuffleFlushManager {
if (!storage.canWrite()) {
// todo: Could we add an interface supportPending for storageManager
// to unify following logic of multiple different storage
managers
- if (event.getRetryTimes() <= retryMax) {
+ if (!reachRetryMax(event)) {
if (event.isPended()) {
LOG.error(
"Drop this event directly due to already having entered
pending queue. event: {}",
@@ -168,9 +175,15 @@ public class ShuffleFlushManager {
return true;
}
event.increaseRetryTimes();
-
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
event.markPended();
- eventHandler.handle(event);
+ if (!reachRetryMax(event)) {
+
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+ eventHandler.handle(event);
+ } else {
+ recordFinalFail(event, start);
+ }
+ } else {
+ recordFinalFail(event, start);
}
return false;
}
@@ -196,23 +209,29 @@ public class ShuffleFlushManager {
ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
writeSuccess = storageManager.write(storage, handler, event);
if (writeSuccess) {
- updateCommittedBlockIds(event.getAppId(), event.getShuffleId(),
blocks);
-
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
- } else if (event.getRetryTimes() <= retryMax) {
+ recordSuccess(event, start);
+ } else if (!reachRetryMax(event)) {
if (event.isPended()) {
LOG.error(
"Drop this event directly due to already having entered pending
queue. event: {}",
event);
}
event.increaseRetryTimes();
- ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
event.markPended();
- eventHandler.handle(event);
+ if (!reachRetryMax(event)) {
+
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+ eventHandler.handle(event);
+ } else {
+ recordFinalFail(event, start);
+ }
}
} catch (Throwable throwable) {
// just log the error, don't throw the exception and stop the flush
thread
LOG.error("Exception happened when process flush shuffle data for {}",
event, throwable);
event.increaseRetryTimes();
+ if (reachRetryMax(event)) {
+ recordFinalFail(event, start);
+ }
}
return writeSuccess;
}