This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 31dae44a9 [#1459] improvement(server): refactor 
DefaultFlushEventHandler and support event retry into pending queue (#1461)
31dae44a9 is described below

commit 31dae44a9608681618c851c8926c6d1f43e1dded
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jan 18 18:46:44 2024 +0800

    [#1459] improvement(server): refactor DefaultFlushEventHandler and support 
event retry into pending queue (#1461)
    
    ### What changes were proposed in this pull request?
    
    1. Refactor DefaultFlushEventHandler to unify the logic of handling event
    2. Fix incorrect some metrics
    3. Support retry event into pending queue
    4. Fix the incorrect inFlushQueueSize
    5. Introduce the underlying executor queue metrics
    
    ### Why are the changes needed?
    
    Fix: #1459 #1460
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs
---
 .../common/function/ConsumerWithException.java     |  23 +++
 .../uniffle/server/DefaultFlushEventHandler.java   | 106 +++++++++++---
 .../uniffle/server/ShuffleDataFlushEvent.java      |   4 +-
 .../apache/uniffle/server/ShuffleFlushManager.java | 162 +++++++--------------
 .../uniffle/server/ShuffleServerMetrics.java       |  16 ++
 .../server/flush/EventDiscardException.java        |  36 +++++
 .../server/flush/EventInvalidException.java        |  37 +++++
 .../uniffle/server/flush/EventRetryException.java  |  37 +++++
 8 files changed, 289 insertions(+), 132 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/function/ConsumerWithException.java
 
b/common/src/main/java/org/apache/uniffle/common/function/ConsumerWithException.java
new file mode 100644
index 000000000..ca18005a1
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/function/ConsumerWithException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.function;
+
+@FunctionalInterface
+public interface ConsumerWithException<T> {
+  void accept(T t) throws Exception;
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index e6123d1f4..d9f6cde1f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -21,7 +21,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
@@ -29,7 +28,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.function.ConsumerWithException;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.flush.EventDiscardException;
+import org.apache.uniffle.server.flush.EventInvalidException;
+import org.apache.uniffle.server.flush.EventRetryException;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.common.HadoopStorage;
 import org.apache.uniffle.storage.common.LocalStorage;
@@ -46,14 +49,14 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
   private Executor fallbackThreadPoolExecutor;
   private final StorageType storageType;
   protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue = 
Queues.newLinkedBlockingQueue();
-  private Consumer<ShuffleDataFlushEvent> eventConsumer;
+  private ConsumerWithException<ShuffleDataFlushEvent> eventConsumer;
 
   private volatile boolean stopped = false;
 
   public DefaultFlushEventHandler(
       ShuffleServerConf conf,
       StorageManager storageManager,
-      Consumer<ShuffleDataFlushEvent> eventConsumer) {
+      ConsumerWithException<ShuffleDataFlushEvent> eventConsumer) {
     this.shuffleServerConf = conf;
     this.storageType =
         
StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE).name());
@@ -65,21 +68,77 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
   @Override
   public void handle(ShuffleDataFlushEvent event) {
     if (!flushQueue.offer(event)) {
-      LOG.warn("Flush queue is full, discard event: " + event);
+      LOG.error("Flush queue is full, discard event: " + event);
     } else {
       ShuffleServerMetrics.gaugeEventQueueSize.inc();
     }
   }
 
-  private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, 
boolean isLocalFile) {
+  /**
+   * @param event
+   * @param storage
+   */
+  private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, 
Storage storage) {
+    long start = System.currentTimeMillis();
     try {
       eventConsumer.accept(event);
+      if (storage != null) {
+        
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+      }
+      event.doCleanup();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Flush event:{} successfully in {} ms and release {} bytes",
+            event,
+            System.currentTimeMillis() - start,
+            event.getSize());
+      }
+    } catch (Exception e) {
+      if (e instanceof EventRetryException) {
+        event.increaseRetryTimes();
+        event.markPended();
+        if (storage != null) {
+          
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+        }
+        this.handle(event);
+        return;
+      }
+
+      if (e instanceof EventDiscardException) {
+        ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+        ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
+        if (storage != null) {
+          
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
+        }
+        event.doCleanup();
+        LOG.error(
+            "Flush event: {} failed in {} ms and release {} bytes. This will 
make data lost.",
+            event,
+            System.currentTimeMillis() - start,
+            event.getSize());
+        return;
+      }
+
+      if (e instanceof EventInvalidException) {
+        return;
+      }
     } finally {
-      if (isLocalFile) {
-        ShuffleServerMetrics.counterLocalFileEventFlush.inc();
+      if (storage != null) {
+        if (storage instanceof HadoopStorage) {
+          ShuffleServerMetrics.counterHadoopEventFlush.inc();
+          ShuffleServerMetrics.gaugeHadoopFlushThreadPoolQueueSize.dec();
+        } else if (storage instanceof LocalStorage) {
+          ShuffleServerMetrics.counterLocalFileEventFlush.inc();
+          ShuffleServerMetrics.gaugeLocalfileFlushThreadPoolQueueSize.dec();
+        } else {
+          ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.dec();
+        }
       } else {
-        ShuffleServerMetrics.counterHadoopEventFlush.inc();
+        ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.dec();
       }
+
+      ShuffleServerMetrics.gaugeEventQueueSize.dec();
     }
   }
 
@@ -109,26 +168,33 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
 
   protected void eventLoop() {
     while (!stopped && !Thread.currentThread().isInterrupted()) {
-      processNextEvent();
+      dispatchEvent();
     }
   }
 
-  protected void processNextEvent() {
+  protected void dispatchEvent() {
     try {
       ShuffleDataFlushEvent event = flushQueue.take();
       Storage storage = storageManager.selectStorage(event);
-      if (storage instanceof HadoopStorage) {
-        hadoopThreadPoolExecutor.execute(() -> 
handleEventAndUpdateMetrics(event, false));
-      } else if (storage instanceof LocalStorage) {
-        localFileThreadPoolExecutor.execute(() -> 
handleEventAndUpdateMetrics(event, true));
+
+      Executor dedicatedExecutor = fallbackThreadPoolExecutor;
+      // pending event will be delegated to fallback threadPool
+      if (!event.isPended()) {
+        if (storage instanceof HadoopStorage) {
+          dedicatedExecutor = hadoopThreadPoolExecutor;
+          ShuffleServerMetrics.gaugeHadoopFlushThreadPoolQueueSize.inc();
+        } else if (storage instanceof LocalStorage) {
+          dedicatedExecutor = localFileThreadPoolExecutor;
+          ShuffleServerMetrics.gaugeLocalfileFlushThreadPoolQueueSize.inc();
+        }
       } else {
-        // When we did not select storage for this event, we will ignore this 
event.
-        // Then we must doCleanup, or will result to resource leak.
-        fallbackThreadPoolExecutor.execute(() -> event.doCleanup());
-        LOG.error("Found unexpected storage type, will not flush for event 
{}.", event);
+        dedicatedExecutor = fallbackThreadPoolExecutor;
+        ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.inc();
       }
+
+      dedicatedExecutor.execute(() -> handleEventAndUpdateMetrics(event, 
storage));
     } catch (Exception e) {
-      LOG.error("Exception happened when process event.", e);
+      LOG.error("Exception happened when pushing events to dedicated event 
handler.", e);
     }
   }
 
@@ -153,7 +219,7 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
 
   @Override
   public int getEventNumInFlush() {
-    return flushQueue.size();
+    return (int) ShuffleServerMetrics.gaugeEventQueueSize.get();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index 4fe54fd6f..7ec882f3b 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -172,7 +172,9 @@ public class ShuffleDataFlushEvent {
         + ", underStorage="
         + (underStorage == null ? null : 
underStorage.getClass().getSimpleName())
         + ", isPended="
-        + isPended;
+        + isPended
+        + ", ownedByHugePartition="
+        + ownedByHugePartition;
   }
 
   public boolean isOwnedByHugePartition() {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index abf9bddf7..41cb26b00 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -35,6 +36,9 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.flush.EventDiscardException;
+import org.apache.uniffle.server.flush.EventInvalidException;
+import org.apache.uniffle.server.flush.EventRetryException;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.common.Storage;
@@ -77,126 +81,65 @@ public class ShuffleFlushManager {
     storageBasePaths = RssUtils.getConfiguredLocalDirs(shuffleServerConf);
     pendingEventTimeoutSec = 
shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
     eventHandler =
-        new DefaultFlushEventHandler(shuffleServerConf, storageManager, 
this::processEvent);
+        new DefaultFlushEventHandler(shuffleServerConf, storageManager, 
this::processFlushEvent);
   }
 
   public void addToFlushQueue(ShuffleDataFlushEvent event) {
     eventHandler.handle(event);
   }
 
-  private void recordFinalFail(ShuffleDataFlushEvent event, long start) {
-    LOG.error(
-        "Failed to write data for {} in {} times, shuffle data will be lost", 
event, retryMax);
-    if (event.getUnderStorage() != null) {
-      
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
-    }
-    event.doCleanup();
-    if (shuffleServer != null) {
-      long duration = System.currentTimeMillis() - start;
-      ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
-      LOG.error(
-          "Flush to file for {} failed in {} ms and release {} bytes",
-          event,
-          duration,
-          event.getSize());
-    }
-  }
-
-  private void recordSuccess(ShuffleDataFlushEvent event, long start) {
-    updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
event.getShuffleBlocks());
-    
ShuffleServerMetrics.incStorageSuccessCounter(event.getUnderStorage().getStorageHost());
-
-    ShuffleTaskInfo shuffleTaskInfo =
-        
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
-    if (null != shuffleTaskInfo) {
-      String storageHost = event.getUnderStorage().getStorageHost();
-      if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
-        shuffleTaskInfo.addOnLocalFileDataSize(event.getSize());
-      } else {
-        shuffleTaskInfo.addOnHadoopDataSize(event.getSize());
-      }
-    }
-
-    event.doCleanup();
-    if (LOG.isDebugEnabled()) {
-      long duration = System.currentTimeMillis() - start;
-      LOG.debug("Flush to file success in {} ms and release {} bytes", 
duration, event.getSize());
-    }
-  }
-
-  public void processEvent(ShuffleDataFlushEvent event) {
+  /**
+   * The method to handle flush event to flush blocks into persistent storage. 
And we will not
+   * change any internal state for event, that means the event is read-only 
for this processing.
+   *
+   * <p>Only the blocks are flushed successfully, it can return directly, 
otherwise it should always
+   * throw dedicated exception.
+   *
+   * @param event
+   * @throws Exception
+   */
+  public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception {
     try {
       ShuffleServerMetrics.gaugeWriteHandler.inc();
-      flushToFile(event);
-      // for thread safety we should not use or change any event info when 
write to file is failed
-    } catch (Exception e) {
-      LOG.error("Exception happened when flush data for " + event, e);
-    } finally {
-      ShuffleServerMetrics.gaugeWriteHandler.dec();
-      ShuffleServerMetrics.gaugeEventQueueSize.dec();
-    }
-  }
-
-  private boolean reachRetryMax(ShuffleDataFlushEvent event) {
-    return event.getRetryTimes() > retryMax;
-  }
 
-  private boolean flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
-    boolean writeSuccess = false;
-
-    try {
       if (!event.isValid()) {
         LOG.warn(
-            "AppId {} was removed already, event {} should be dropped", 
event.getAppId(), event);
-        return true;
+            "AppId {} was removed already, event:{} should be dropped", 
event.getAppId(), event);
+        // we should catch this to avoid cleaning up duplicate.
+        throw new EventInvalidException();
+      }
+
+      if (reachRetryMax(event)) {
+        LOG.warn("The event:{] has been reached to max retry times, it will be 
dropped.", event);
+        throw new EventDiscardException();
       }
 
       List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-      if (blocks == null || blocks.isEmpty()) {
+      if (CollectionUtils.isEmpty(blocks)) {
         LOG.info("There is no block to be flushed: {}", event);
-        return true;
+        return;
       }
 
       Storage storage = event.getUnderStorage();
       if (storage == null) {
         LOG.error("Storage selected is null and this should not happen. event: 
{}", event);
-        return true;
+        throw new EventDiscardException();
       }
 
       if (event.isPended()
           && System.currentTimeMillis() - event.getStartPendingTime()
               > pendingEventTimeoutSec * 1000L) {
-        ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
         LOG.error(
             "Flush event cannot be flushed for {} sec, the event {} is 
dropped",
             pendingEventTimeoutSec,
             event);
-        return true;
+        throw new EventDiscardException();
       }
 
       if (!storage.canWrite()) {
-        // todo: Could we add an interface supportPending for storageManager
-        //       to unify following logic of multiple different storage 
managers
-        if (!reachRetryMax(event)) {
-          if (event.isPended()) {
-            LOG.error(
-                "Drop this event directly due to already having entered 
pending queue. event: {}",
-                event);
-            return true;
-          }
-          event.increaseRetryTimes();
-          event.markPended();
-          if (!reachRetryMax(event)) {
-            
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-            eventHandler.handle(event);
-          } else {
-            recordFinalFail(event, start);
-          }
-        } else {
-          recordFinalFail(event, start);
-        }
-        return false;
+        LOG.error(
+            "The event: {} is limited to flush due to storage:{} can't write", 
event, storage);
+        throw new EventRetryException();
       }
 
       String user =
@@ -218,33 +161,30 @@ public class ShuffleFlushManager {
               user,
               maxConcurrencyPerPartitionToWrite);
       ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
-      writeSuccess = storageManager.write(storage, handler, event);
-      if (writeSuccess) {
-        recordSuccess(event, start);
-      } else if (!reachRetryMax(event)) {
-        if (event.isPended()) {
-          LOG.error(
-              "Drop this event directly due to already having entered pending 
queue. event: {}",
-              event);
-        }
-        event.increaseRetryTimes();
-        event.markPended();
-        if (!reachRetryMax(event)) {
-          
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-          eventHandler.handle(event);
+      boolean writeSuccess = storageManager.write(storage, handler, event);
+      if (!writeSuccess) {
+        throw new EventRetryException();
+      }
+
+      // update some metrics for shuffle task
+      updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
event.getShuffleBlocks());
+      ShuffleTaskInfo shuffleTaskInfo =
+          
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
+      if (null != shuffleTaskInfo) {
+        String storageHost = event.getUnderStorage().getStorageHost();
+        if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
+          shuffleTaskInfo.addOnLocalFileDataSize(event.getSize());
         } else {
-          recordFinalFail(event, start);
+          shuffleTaskInfo.addOnHadoopDataSize(event.getSize());
         }
       }
-    } catch (Throwable throwable) {
-      // just log the error, don't throw the exception and stop the flush 
thread
-      LOG.error("Exception happened when process flush shuffle data for {}", 
event, throwable);
-      event.increaseRetryTimes();
-      if (reachRetryMax(event)) {
-        recordFinalFail(event, start);
-      }
+    } finally {
+      ShuffleServerMetrics.gaugeWriteHandler.dec();
     }
-    return writeSuccess;
+  }
+
+  private boolean reachRetryMax(ShuffleDataFlushEvent event) {
+    return event.getRetryTimes() > retryMax;
   }
 
   private int getMaxConcurrencyPerPartitionWrite(ShuffleDataFlushEvent event) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 400712076..2ab1a4ba1 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -48,6 +48,12 @@ public class ShuffleServerMetrics {
   private static final String EVENT_SIZE_THRESHOLD_LEVEL3 = 
"event_size_threshold_level3";
   private static final String EVENT_SIZE_THRESHOLD_LEVEL4 = 
"event_size_threshold_level4";
   private static final String EVENT_QUEUE_SIZE = "event_queue_size";
+  private static final String HADOOP_FLUSH_THREAD_POOL_QUEUE_SIZE =
+      "hadoop_flush_thread_pool_queue_size";
+  private static final String LOCALFILE_FLUSH_THREAD_POOL_QUEUE_SIZE =
+      "localfile_flush_thread_pool_queue_size";
+  private static final String FALLBACK_FLUSH_THREAD_POOL_QUEUE_SIZE =
+      "fallback_flush_thread_pool_queue_size";
   private static final String TOTAL_READ_DATA = "total_read_data";
   private static final String TOTAL_READ_LOCAL_DATA_FILE = 
"total_read_local_data_file";
   private static final String TOTAL_READ_LOCAL_INDEX_FILE = 
"total_read_local_index_file";
@@ -175,6 +181,9 @@ public class ShuffleServerMetrics {
   public static Gauge.Child gaugeUsedDirectMemorySize;
   public static Gauge.Child gaugeWriteHandler;
   public static Gauge.Child gaugeEventQueueSize;
+  public static Gauge.Child gaugeHadoopFlushThreadPoolQueueSize;
+  public static Gauge.Child gaugeLocalfileFlushThreadPoolQueueSize;
+  public static Gauge.Child gaugeFallbackFlushThreadPoolQueueSize;
   public static Gauge.Child gaugeAppNum;
   public static Gauge.Child gaugeTotalPartitionNum;
 
@@ -348,6 +357,13 @@ public class ShuffleServerMetrics {
     gaugeUsedDirectMemorySize = 
metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE);
     gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
     gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE);
+    gaugeHadoopFlushThreadPoolQueueSize =
+        metricsManager.addLabeledGauge(HADOOP_FLUSH_THREAD_POOL_QUEUE_SIZE);
+    gaugeLocalfileFlushThreadPoolQueueSize =
+        metricsManager.addLabeledGauge(LOCALFILE_FLUSH_THREAD_POOL_QUEUE_SIZE);
+    gaugeFallbackFlushThreadPoolQueueSize =
+        metricsManager.addLabeledGauge(FALLBACK_FLUSH_THREAD_POOL_QUEUE_SIZE);
+
     gaugeAppNum = metricsManager.addLabeledGauge(APP_NUM_WITH_NODE);
     gaugeTotalPartitionNum = 
metricsManager.addLabeledGauge(PARTITION_NUM_WITH_NODE);
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/flush/EventDiscardException.java
 
b/server/src/main/java/org/apache/uniffle/server/flush/EventDiscardException.java
new file mode 100644
index 000000000..ef31c853a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/flush/EventDiscardException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.flush;
+
+public class EventDiscardException extends Exception {
+  public EventDiscardException() {
+    super();
+  }
+
+  public EventDiscardException(String message) {
+    super(message);
+  }
+
+  public EventDiscardException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public EventDiscardException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/flush/EventInvalidException.java
 
b/server/src/main/java/org/apache/uniffle/server/flush/EventInvalidException.java
new file mode 100644
index 000000000..4d947f452
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/flush/EventInvalidException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.flush;
+
+public class EventInvalidException extends Exception {
+
+  public EventInvalidException() {
+    super();
+  }
+
+  public EventInvalidException(String message) {
+    super(message);
+  }
+
+  public EventInvalidException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public EventInvalidException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/flush/EventRetryException.java 
b/server/src/main/java/org/apache/uniffle/server/flush/EventRetryException.java
new file mode 100644
index 000000000..ad1f5663e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/flush/EventRetryException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.flush;
+
+public class EventRetryException extends Exception {
+
+  public EventRetryException() {
+    super();
+  }
+
+  public EventRetryException(String message) {
+    super(message);
+  }
+
+  public EventRetryException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public EventRetryException(Throwable cause) {
+    super(cause);
+  }
+}

Reply via email to