This is an automated email from the ASF dual-hosted git repository.

colin pushed a commit to branch branch-0.5.0
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.5.0 by this push:
     new 9cf3632  [Bug] Fix NPE problem when process the event if application 
was cleared already (#16) (#21)
9cf3632 is described below

commit 9cf3632622c9fabe83c3589140bf9aee64f20e64
Author: Colin <[email protected]>
AuthorDate: Tue Jul 5 10:15:59 2022 +0800

    [Bug] Fix NPE problem when process the event if application was cleared 
already (#16) (#21)
    
    ### What changes were proposed in this pull request?
    There will be NPE problem when process the event if application was cleared 
already
    
    ### Why are the changes needed?
    Fix a critical bug which cause resource leak.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT is added
---
 .../tencent/rss/server/ShuffleFlushManager.java    | 120 ++++++++++++---------
 .../rss/server/storage/HdfsStorageManager.java     |  17 +--
 .../rss/server/storage/SingleStorageManager.java   |  48 +++++----
 .../rss/server/ShuffleFlushManagerTest.java        |  40 +++++++
 4 files changed, 148 insertions(+), 77 deletions(-)

diff --git 
a/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java 
b/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
index e246b02..48a6878 100644
--- a/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
+++ b/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
@@ -93,10 +93,14 @@ public class ShuffleFlushManager {
         try {
           ShuffleDataFlushEvent event = flushQueue.take();
           threadPoolExecutor.execute(() -> {
-            ShuffleServerMetrics.gaugeEventQueueSize.set(flushQueue.size());
-            ShuffleServerMetrics.gaugeWriteHandler.inc();
-            flushToFile(event);
-            ShuffleServerMetrics.gaugeWriteHandler.dec();
+            try {
+              ShuffleServerMetrics.gaugeEventQueueSize.set(flushQueue.size());
+              ShuffleServerMetrics.gaugeWriteHandler.inc();
+              flushToFile(event);
+              ShuffleServerMetrics.gaugeWriteHandler.dec();
+            } catch (Exception e) {
+              LOG.error("Exception happened when flush data for " + event, e);
+            }
           });
         } catch (Exception e) {
           LOG.error("Exception happened when process event.", e);
@@ -133,7 +137,7 @@ public class ShuffleFlushManager {
   private void flushToFile(ShuffleDataFlushEvent event) {
 
     Storage storage = storageManager.selectStorage(event);
-    if (!storage.canWrite()) {
+    if (storage != null && !storage.canWrite()) {
       addPendingEvents(event);
       return;
     }
@@ -142,49 +146,52 @@ public class ShuffleFlushManager {
     List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
     boolean writeSuccess = false;
     try {
-      if (blocks == null || blocks.isEmpty()) {
-        LOG.info("There is no block to be flushed: " + event);
-      } else if (!event.isValid()) {
-        //  avoid printing error log
-        writeSuccess = true;
-        LOG.warn("AppId {} was removed already, event {} should be dropped", 
event.getAppId(), event);
-      } else {
-        ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(new 
CreateShuffleWriteHandlerRequest(
-            storageType,
-            event.getAppId(),
-            event.getShuffleId(),
-            event.getStartPartition(),
-            event.getEndPartition(),
-            storageBasePaths,
-            shuffleServerId,
-            hadoopConf,
-            storageDataReplica));
-
-        do {
-          if (event.getRetryTimes() > retryMax) {
-            LOG.error("Failed to write data for " + event + " in " + retryMax 
+ " times, shuffle data will be lost");
-            
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
-            break;
-          }
-          if (!event.isValid()) {
-            LOG.warn("AppId {} was removed already, event {} should be 
dropped, may leak one handler",
-                event.getAppId(), event);
-            //  avoid printing error log
-            writeSuccess = true;
-            break;
-          }
+      // storage info maybe null if the application cache was cleared already
+      if (storage != null) {
+        if (blocks == null || blocks.isEmpty()) {
+          LOG.info("There is no block to be flushed: " + event);
+        } else if (!event.isValid()) {
+          //  avoid printing error log
+          writeSuccess = true;
+          LOG.warn("AppId {} was removed already, event {} should be dropped", 
event.getAppId(), event);
+        } else {
+          ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(new 
CreateShuffleWriteHandlerRequest(
+              storageType,
+              event.getAppId(),
+              event.getShuffleId(),
+              event.getStartPartition(),
+              event.getEndPartition(),
+              storageBasePaths,
+              shuffleServerId,
+              hadoopConf,
+              storageDataReplica));
 
-          writeSuccess = storageManager.write(storage, handler, event);
+          do {
+            if (event.getRetryTimes() > retryMax) {
+              LOG.error("Failed to write data for " + event + " in " + 
retryMax + " times, shuffle data will be lost");
+              
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
+              break;
+            }
+            if (!event.isValid()) {
+              LOG.warn("AppId {} was removed already, event {} should be 
dropped, may leak one handler",
+                  event.getAppId(), event);
+              //  avoid printing error log
+              writeSuccess = true;
+              break;
+            }
 
-          if (writeSuccess) {
-            updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
blocks);
-            
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
-            break;
-          } else {
-            event.increaseRetryTimes();
-            
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-          }
-        } while (event.getRetryTimes() <= retryMax);
+            writeSuccess = storageManager.write(storage, handler, event);
+
+            if (writeSuccess) {
+              updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
blocks);
+              
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+              break;
+            } else {
+              event.increaseRetryTimes();
+              
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+            }
+          } while (event.getRetryTimes() <= retryMax);
+        }
       }
     } catch (Exception e) {
       // just log the error, don't throw the exception and stop the flush 
thread
@@ -272,16 +279,19 @@ public class ShuffleFlushManager {
   void processPendingEvents() throws Exception {
     PendingShuffleFlushEvent event = pendingEvents.take();
     Storage storage = storageManager.selectStorage(event.getEvent());
+    if (storage == null) {
+      dropPendingEvent(event);
+      LOG.error("Flush event cannot be flushed because of application related 
was cleared, {}", event.getEvent());
+      return;
+    }
     if (System.currentTimeMillis() - event.getCreateTimeStamp() > 
pendingEventTimeoutSec * 1000L) {
-      ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
-      if (shuffleServer != null) {
-        shuffleServer.getShuffleBufferManager().releaseMemory(
-            event.getEvent().getSize(), true, false);
-      }
+      dropPendingEvent(event);
       LOG.error("Flush event cannot be flushed for {} sec, the event {} is 
dropped",
           pendingEventTimeoutSec, event.getEvent());
       return;
     }
+    // storage maybe null if the application cache was cleared already
+    // add event to flush queue, and it will be released
     if (storage.canWrite()) {
       addToFlushQueue(event.getEvent());
       return;
@@ -289,6 +299,14 @@ public class ShuffleFlushManager {
     addPendingEventsInternal(event);
   }
 
+  private void dropPendingEvent(PendingShuffleFlushEvent event) {
+    ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+    if (shuffleServer != null) {
+      shuffleServer.getShuffleBufferManager().releaseMemory(
+          event.getEvent().getSize(), true, false);
+    }
+  }
+
   @VisibleForTesting
   void addPendingEvents(ShuffleDataFlushEvent event) {
     addPendingEventsInternal(new PendingShuffleFlushEvent(event));
diff --git 
a/server/src/main/java/com/tencent/rss/server/storage/HdfsStorageManager.java 
b/server/src/main/java/com/tencent/rss/server/storage/HdfsStorageManager.java
index c3cfa62..4f7e597 100644
--- 
a/server/src/main/java/com/tencent/rss/server/storage/HdfsStorageManager.java
+++ 
b/server/src/main/java/com/tencent/rss/server/storage/HdfsStorageManager.java
@@ -72,11 +72,14 @@ public class HdfsStorageManager extends 
SingleStorageManager {
   @Override
   public void removeResources(String appId, Set<Integer> shuffleSet) {
     HdfsStorage storage = getStorageByAppId(appId);
-    storage.removeHandlers(appId);
-    appIdToStorages.remove(appId);
-    ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
-        .createShuffleDeleteHandler(new 
CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(), storage.getConf()));
-    deleteHandler.delete(new String[] {storage.getStoragePath()}, appId);
+    if (storage != null) {
+      storage.removeHandlers(appId);
+      appIdToStorages.remove(appId);
+      ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
+          .createShuffleDeleteHandler(
+              new CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(), 
storage.getConf()));
+      deleteHandler.delete(new String[] {storage.getStoragePath()}, appId);
+    }
   }
 
   @Override
@@ -110,7 +113,9 @@ public class HdfsStorageManager extends 
SingleStorageManager {
     if (!appIdToStorages.containsKey(appId)) {
       String msg = "Can't find HDFS storage for appId[" + appId + "]";
       LOG.error(msg);
-      throw new RuntimeException(msg);
+      // outside should deal with null situation
+      // todo: it's better to have a fake storage for null situation
+      return null;
     }
     return appIdToStorages.get(appId);
   }
diff --git 
a/server/src/main/java/com/tencent/rss/server/storage/SingleStorageManager.java 
b/server/src/main/java/com/tencent/rss/server/storage/SingleStorageManager.java
index d5fe318..7b6da98 100644
--- 
a/server/src/main/java/com/tencent/rss/server/storage/SingleStorageManager.java
+++ 
b/server/src/main/java/com/tencent/rss/server/storage/SingleStorageManager.java
@@ -84,27 +84,35 @@ public abstract class SingleStorageManager implements 
StorageManager {
 
   @Override
   public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) {
-    // update shuffle server metrics, these metrics belong to server module
-    // we can't update them in storage module
-    StorageWriteMetrics metrics = createStorageWriteMetrics(event, writeTime);
-    ShuffleServerMetrics.counterTotalWriteTime.inc(metrics.getWriteTime());
-    ShuffleServerMetrics.counterWriteTotal.inc();
-    if (metrics.getWriteTime() > writeSlowThreshold) {
-      ShuffleServerMetrics.counterWriteSlow.inc();
-    }
-    ShuffleServerMetrics.counterTotalWriteDataSize.inc(metrics.getEventSize());
-    
ShuffleServerMetrics.counterTotalWriteBlockSize.inc(metrics.getWriteBlocks());
-    if (metrics.getEventSize() < eventSizeThresholdL1) {
-      ShuffleServerMetrics.counterEventSizeThresholdLevel1.inc();
-    } else if (metrics.getEventSize() < eventSizeThresholdL2) {
-      ShuffleServerMetrics.counterEventSizeThresholdLevel2.inc();
-    } else if (metrics.getEventSize() < eventSizeThresholdL3) {
-      ShuffleServerMetrics.counterEventSizeThresholdLevel3.inc();
-    } else {
-      ShuffleServerMetrics.counterEventSizeThresholdLevel4.inc();
+    // the metrics update shouldn't block normal process
+    // log the exception if error happen
+    try {
+      // update shuffle server metrics, these metrics belong to server module
+      // we can't update them in storage module
+      StorageWriteMetrics metrics = createStorageWriteMetrics(event, 
writeTime);
+      ShuffleServerMetrics.counterTotalWriteTime.inc(metrics.getWriteTime());
+      ShuffleServerMetrics.counterWriteTotal.inc();
+      if (metrics.getWriteTime() > writeSlowThreshold) {
+        ShuffleServerMetrics.counterWriteSlow.inc();
+      }
+      
ShuffleServerMetrics.counterTotalWriteDataSize.inc(metrics.getEventSize());
+      
ShuffleServerMetrics.counterTotalWriteBlockSize.inc(metrics.getWriteBlocks());
+      if (metrics.getEventSize() < eventSizeThresholdL1) {
+        ShuffleServerMetrics.counterEventSizeThresholdLevel1.inc();
+      } else if (metrics.getEventSize() < eventSizeThresholdL2) {
+        ShuffleServerMetrics.counterEventSizeThresholdLevel2.inc();
+      } else if (metrics.getEventSize() < eventSizeThresholdL3) {
+        ShuffleServerMetrics.counterEventSizeThresholdLevel3.inc();
+      } else {
+        ShuffleServerMetrics.counterEventSizeThresholdLevel4.inc();
+      }
+      Storage storage = selectStorage(event);
+      if (storage != null) {
+        storage.updateWriteMetrics(metrics);
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception happened when update write metrics for " + event, e);
     }
-    Storage storage = selectStorage(event);
-    storage.updateWriteMetrics(metrics);
   }
 
   public StorageWriteMetrics createStorageWriteMetrics(ShuffleDataFlushEvent 
event, long writeTime) {
diff --git 
a/server/src/test/java/com/tencent/rss/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/com/tencent/rss/server/ShuffleFlushManagerTest.java
index a562f7c..c89d5f2 100644
--- a/server/src/test/java/com/tencent/rss/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/com/tencent/rss/server/ShuffleFlushManagerTest.java
@@ -30,6 +30,7 @@ import java.util.function.Supplier;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import io.prometheus.client.Gauge;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -131,6 +132,14 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
 
     assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(storageHost).get(), 
0.5);
     assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(storageHost).get(), 
0.5);
+
+    // test case for process event whose related app was cleared already
+    assertEquals(0, ShuffleServerMetrics.gaugeWriteHandler.get(), 0.5);
+    ShuffleDataFlushEvent fakeEvent =
+        createShuffleDataFlushEvent("fakeAppId", 1, 1, 1, null);
+    manager.addToFlushQueue(fakeEvent);
+    waitForQueueClear(manager);
+    waitForMetrics(ShuffleServerMetrics.gaugeWriteHandler, 0, 0.5);
   }
 
   @Test
@@ -266,6 +275,37 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals(0, storage.getHandlerSize());
   }
 
+  private void waitForMetrics(Gauge gauge, double expected, double delta) 
throws Exception {
+    int retry = 0;
+    boolean match = false;
+    do {
+      Thread.sleep(500);
+      if (retry > 10) {
+        fail("Unexpected flush process when waitForMetrics");
+      }
+      retry++;
+      try {
+        assertEquals(0, gauge.get(), delta);
+        match = true;
+      } catch(Exception e) {
+        // ignore
+      }
+    } while (!match);
+  }
+
+  private void waitForQueueClear(ShuffleFlushManager manager) throws Exception 
{
+    int retry = 0;
+    int size = 0;
+    do {
+      Thread.sleep(500);
+      if (retry > 10) {
+        fail("Unexpected flush process when waitForQueueClear");
+      }
+      retry++;
+      size = manager.getEventNumInFlush();
+    } while (size > 0);
+  }
+
   private void waitForFlush(ShuffleFlushManager manager,
       String appId, int shuffleId, int expectedBlockNum) throws Exception {
     int retry = 0;

Reply via email to