This is an automated email from the ASF dual-hosted git repository. xianjingfeng pushed a commit to branch branch-0.8 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit bb56768325347940aa0e3c21e9ae78a58b4a5d46 Author: summaryzb <[email protected]> AuthorDate: Wed Oct 25 19:35:58 2023 +0800 [#1252] fix(server): Incorrect storage write fail metric (#1253) What changes were proposed in this pull request? As title Why are the changes needed? Fix: #1252 Does this PR introduce any user-facing change? No. How was this patch tested? unit test (cherry picked from commit 931d6cda0e3a691867721f51eabb567d568831e8) --- .../apache/uniffle/server/ShuffleFlushManager.java | 93 +++++++++++++--------- 1 file changed, 57 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index ba46dfab0..eec574179 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -83,37 +83,41 @@ public class ShuffleFlushManager { eventHandler.handle(event); } + private void recordFinalFail(ShuffleDataFlushEvent event, long start) { + LOG.error( + "Failed to write data for {} in {} times, shuffle data will be lost", event, retryMax); + if (event.getUnderStorage() != null) { + ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost()); + } + event.doCleanup(); + if (shuffleServer != null) { + long duration = System.currentTimeMillis() - start; + ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc(); + LOG.error( + "Flush to file for {} failed in {} ms and release {} bytes", + event, + duration, + event.getSize()); + } + } + + private void recordSuccess(ShuffleDataFlushEvent event, long start) { + updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks()); + ShuffleServerMetrics.incStorageSuccessCounter(event.getUnderStorage().getStorageHost()); + event.doCleanup(); + if (shuffleServer != null) { + if (LOG.isDebugEnabled()) { + long duration = System.currentTimeMillis() - start; + LOG.debug("Flush to file success in {} ms and release {} bytes", duration, event.getSize()); + } + } + } + public void processEvent(ShuffleDataFlushEvent event) { try { ShuffleServerMetrics.gaugeWriteHandler.inc(); - long start = System.currentTimeMillis(); - boolean writeSuccess = flushToFile(event); - if (writeSuccess || event.getRetryTimes() > retryMax) { - if (event.getRetryTimes() > retryMax) { - LOG.error( - "Failed to write data for {} in {} times, shuffle data will be lost", - event, - retryMax); - if (event.getUnderStorage() != null) { - ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost()); - } - } - event.doCleanup(); - if (shuffleServer != null) { - long duration = System.currentTimeMillis() - start; - if (writeSuccess) { - LOG.debug( - "Flush to file success in {} ms and release {} bytes", duration, event.getSize()); - } else { - ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc(); - LOG.error( - "Flush to file for {} failed in {} ms and release {} bytes", - event, - duration, - event.getSize()); - } - } - } + flushToFile(event); + // for thread safety we should not use or change any event info when write to file is failed } catch (Exception e) { LOG.error("Exception happened when flush data for " + event, e); } finally { @@ -122,7 +126,12 @@ public class ShuffleFlushManager { } } + private boolean reachRetryMax(ShuffleDataFlushEvent event) { + return event.getRetryTimes() > retryMax; + } + private boolean flushToFile(ShuffleDataFlushEvent event) { + long start = System.currentTimeMillis(); boolean writeSuccess = false; try { @@ -158,7 +167,7 @@ public class ShuffleFlushManager { if (!storage.canWrite()) { // todo: Could we add an interface supportPending for storageManager // to unify following logic of multiple different storage managers - if (event.getRetryTimes() <= retryMax) { + if (!reachRetryMax(event)) { if (event.isPended()) { LOG.error( "Drop this event directly due to already having entered pending queue. event: {}", @@ -166,9 +175,15 @@ public class ShuffleFlushManager { return true; } event.increaseRetryTimes(); - ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); event.markPended(); - eventHandler.handle(event); + if (!reachRetryMax(event)) { + ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); + eventHandler.handle(event); + } else { + recordFinalFail(event, start); + } + } else { + recordFinalFail(event, start); } return false; } @@ -194,23 +209,29 @@ public class ShuffleFlushManager { ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request); writeSuccess = storageManager.write(storage, handler, event); if (writeSuccess) { - updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks); - ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost()); - } else if (event.getRetryTimes() <= retryMax) { + recordSuccess(event, start); + } else if (!reachRetryMax(event)) { if (event.isPended()) { LOG.error( "Drop this event directly due to already having entered pending queue. event: {}", event); } event.increaseRetryTimes(); - ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); event.markPended(); - eventHandler.handle(event); + if (!reachRetryMax(event)) { + ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); + eventHandler.handle(event); + } else { + recordFinalFail(event, start); + } } } catch (Throwable throwable) { // just log the error, don't throw the exception and stop the flush thread LOG.error("Exception happened when process flush shuffle data for {}", event, throwable); event.increaseRetryTimes(); + if (reachRetryMax(event)) { + recordFinalFail(event, start); + } } return writeSuccess; }
