zuston commented on code in PR #1461:
URL:
https://github.com/apache/incubator-uniffle/pull/1461#discussion_r1456738592
##########
server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java:
##########
@@ -109,26 +168,33 @@ private void startEventProcessor() {
protected void eventLoop() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
- processNextEvent();
+ dispatchEvent();
}
}
- protected void processNextEvent() {
+ protected void dispatchEvent() {
try {
ShuffleDataFlushEvent event = flushQueue.take();
Storage storage = storageManager.selectStorage(event);
- if (storage instanceof HadoopStorage) {
- hadoopThreadPoolExecutor.execute(() ->
handleEventAndUpdateMetrics(event, false));
- } else if (storage instanceof LocalStorage) {
- localFileThreadPoolExecutor.execute(() ->
handleEventAndUpdateMetrics(event, true));
+
+ Executor dedicatedExecutor = fallbackThreadPoolExecutor;
+ // pending event will be delegated to fallback threadPool
+ if (!event.isPended()) {
+ if (storage instanceof HadoopStorage) {
+ dedicatedExecutor = hadoopThreadPoolExecutor;
+ ShuffleServerMetrics.gaugeHadoopFlushThreadPoolQueueSize.inc();
+ } else if (storage instanceof LocalStorage) {
+ dedicatedExecutor = localFileThreadPoolExecutor;
+ ShuffleServerMetrics.gaugeLocalfileFlushThreadPoolQueueSize.inc();
+ }
} else {
- // When we did not select storage for this event, we will ignore this
event.
- // Then we must doCleanup, or will result to resource leak.
- fallbackThreadPoolExecutor.execute(() -> event.doCleanup());
Review Comment:
Needn't. It will also dispatch event into consumer that will unify all logic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]