This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 55ec437 [Bug] Fix NPE problem when process the event if application
was cleared already (#16)
55ec437 is described below
commit 55ec437aed9a465f0ad014534e17b82f501b0464
Author: Colin <[email protected]>
AuthorDate: Mon Jul 4 20:40:44 2022 +0800
[Bug] Fix NPE problem when process the event if application was cleared
already (#16)
### What changes were proposed in this pull request?
There will be NPE problem when process the event if application was cleared
already
### Why are the changes needed?
Fix a critical bug which cause resource leak.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT is added
---
.../apache/uniffle/server/ShuffleFlushManager.java | 120 ++++++++++++---------
.../uniffle/server/storage/HdfsStorageManager.java | 17 +--
.../server/storage/SingleStorageManager.java | 48 +++++----
.../uniffle/server/ShuffleFlushManagerTest.java | 40 +++++++
4 files changed, 148 insertions(+), 77 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 95f0bf8..367032b 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -93,10 +93,14 @@ public class ShuffleFlushManager {
try {
ShuffleDataFlushEvent event = flushQueue.take();
threadPoolExecutor.execute(() -> {
- ShuffleServerMetrics.gaugeEventQueueSize.set(flushQueue.size());
- ShuffleServerMetrics.gaugeWriteHandler.inc();
- flushToFile(event);
- ShuffleServerMetrics.gaugeWriteHandler.dec();
+ try {
+ ShuffleServerMetrics.gaugeEventQueueSize.set(flushQueue.size());
+ ShuffleServerMetrics.gaugeWriteHandler.inc();
+ flushToFile(event);
+ ShuffleServerMetrics.gaugeWriteHandler.dec();
+ } catch (Exception e) {
+ LOG.error("Exception happened when flush data for " + event, e);
+ }
});
} catch (Exception e) {
LOG.error("Exception happened when process event.", e);
@@ -136,7 +140,7 @@ public class ShuffleFlushManager {
private void flushToFile(ShuffleDataFlushEvent event) {
Storage storage = storageManager.selectStorage(event);
- if (!storage.canWrite()) {
+ if (storage != null && !storage.canWrite()) {
addPendingEvents(event);
return;
}
@@ -145,49 +149,52 @@ public class ShuffleFlushManager {
List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
boolean writeSuccess = false;
try {
- if (blocks == null || blocks.isEmpty()) {
- LOG.info("There is no block to be flushed: " + event);
- } else if (!event.isValid()) {
- // avoid printing error log
- writeSuccess = true;
- LOG.warn("AppId {} was removed already, event {} should be dropped",
event.getAppId(), event);
- } else {
- ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(new
CreateShuffleWriteHandlerRequest(
- storageType,
- event.getAppId(),
- event.getShuffleId(),
- event.getStartPartition(),
- event.getEndPartition(),
- storageBasePaths,
- shuffleServerId,
- hadoopConf,
- storageDataReplica));
-
- do {
- if (event.getRetryTimes() > retryMax) {
- LOG.error("Failed to write data for " + event + " in " + retryMax
+ " times, shuffle data will be lost");
-
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
- break;
- }
- if (!event.isValid()) {
- LOG.warn("AppId {} was removed already, event {} should be
dropped, may leak one handler",
- event.getAppId(), event);
- // avoid printing error log
- writeSuccess = true;
- break;
- }
+ // storage info maybe null if the application cache was cleared already
+ if (storage != null) {
+ if (blocks == null || blocks.isEmpty()) {
+ LOG.info("There is no block to be flushed: " + event);
+ } else if (!event.isValid()) {
+ // avoid printing error log
+ writeSuccess = true;
+ LOG.warn("AppId {} was removed already, event {} should be dropped",
event.getAppId(), event);
+ } else {
+ ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(new
CreateShuffleWriteHandlerRequest(
+ storageType,
+ event.getAppId(),
+ event.getShuffleId(),
+ event.getStartPartition(),
+ event.getEndPartition(),
+ storageBasePaths,
+ shuffleServerId,
+ hadoopConf,
+ storageDataReplica));
- writeSuccess = storageManager.write(storage, handler, event);
+ do {
+ if (event.getRetryTimes() > retryMax) {
+ LOG.error("Failed to write data for " + event + " in " +
retryMax + " times, shuffle data will be lost");
+
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
+ break;
+ }
+ if (!event.isValid()) {
+ LOG.warn("AppId {} was removed already, event {} should be
dropped, may leak one handler",
+ event.getAppId(), event);
+ // avoid printing error log
+ writeSuccess = true;
+ break;
+ }
- if (writeSuccess) {
- updateCommittedBlockIds(event.getAppId(), event.getShuffleId(),
blocks);
-
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
- break;
- } else {
- event.increaseRetryTimes();
-
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
- }
- } while (event.getRetryTimes() <= retryMax);
+ writeSuccess = storageManager.write(storage, handler, event);
+
+ if (writeSuccess) {
+ updateCommittedBlockIds(event.getAppId(), event.getShuffleId(),
blocks);
+
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+ break;
+ } else {
+ event.increaseRetryTimes();
+
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+ }
+ } while (event.getRetryTimes() <= retryMax);
+ }
}
} catch (Exception e) {
// just log the error, don't throw the exception and stop the flush
thread
@@ -275,16 +282,19 @@ public class ShuffleFlushManager {
void processPendingEvents() throws Exception {
PendingShuffleFlushEvent event = pendingEvents.take();
Storage storage = storageManager.selectStorage(event.getEvent());
+ if (storage == null) {
+ dropPendingEvent(event);
+ LOG.error("Flush event cannot be flushed because of application related
was cleared, {}", event.getEvent());
+ return;
+ }
if (System.currentTimeMillis() - event.getCreateTimeStamp() >
pendingEventTimeoutSec * 1000L) {
- ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
- if (shuffleServer != null) {
- shuffleServer.getShuffleBufferManager().releaseMemory(
- event.getEvent().getSize(), true, false);
- }
+ dropPendingEvent(event);
LOG.error("Flush event cannot be flushed for {} sec, the event {} is
dropped",
pendingEventTimeoutSec, event.getEvent());
return;
}
+ // storage maybe null if the application cache was cleared already
+ // add event to flush queue, and it will be released
if (storage.canWrite()) {
addToFlushQueue(event.getEvent());
return;
@@ -292,6 +302,14 @@ public class ShuffleFlushManager {
addPendingEventsInternal(event);
}
+ private void dropPendingEvent(PendingShuffleFlushEvent event) {
+ ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+ if (shuffleServer != null) {
+ shuffleServer.getShuffleBufferManager().releaseMemory(
+ event.getEvent().getSize(), true, false);
+ }
+ }
+
@VisibleForTesting
void addPendingEvents(ShuffleDataFlushEvent event) {
addPendingEventsInternal(new PendingShuffleFlushEvent(event));
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index c850d5c..04d65b1 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -71,11 +71,14 @@ public class HdfsStorageManager extends
SingleStorageManager {
@Override
public void removeResources(String appId, Set<Integer> shuffleSet) {
HdfsStorage storage = getStorageByAppId(appId);
- storage.removeHandlers(appId);
- appIdToStorages.remove(appId);
- ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
- .createShuffleDeleteHandler(new
CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(), storage.getConf()));
- deleteHandler.delete(new String[] {storage.getStoragePath()}, appId);
+ if (storage != null) {
+ storage.removeHandlers(appId);
+ appIdToStorages.remove(appId);
+ ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
+ .createShuffleDeleteHandler(
+ new CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(),
storage.getConf()));
+ deleteHandler.delete(new String[] {storage.getStoragePath()}, appId);
+ }
}
@Override
@@ -109,7 +112,9 @@ public class HdfsStorageManager extends
SingleStorageManager {
if (!appIdToStorages.containsKey(appId)) {
String msg = "Can't find HDFS storage for appId[" + appId + "]";
LOG.error(msg);
- throw new RuntimeException(msg);
+ // outside should deal with null situation
+ // todo: it's better to have a fake storage for null situation
+ return null;
}
return appIdToStorages.get(appId);
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index cf9ff9d..299b6a4 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -82,27 +82,35 @@ public abstract class SingleStorageManager implements
StorageManager {
@Override
public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) {
- // update shuffle server metrics, these metrics belong to server module
- // we can't update them in storage module
- StorageWriteMetrics metrics = createStorageWriteMetrics(event, writeTime);
- ShuffleServerMetrics.counterTotalWriteTime.inc(metrics.getWriteTime());
- ShuffleServerMetrics.counterWriteTotal.inc();
- if (metrics.getWriteTime() > writeSlowThreshold) {
- ShuffleServerMetrics.counterWriteSlow.inc();
- }
- ShuffleServerMetrics.counterTotalWriteDataSize.inc(metrics.getEventSize());
-
ShuffleServerMetrics.counterTotalWriteBlockSize.inc(metrics.getWriteBlocks());
- if (metrics.getEventSize() < eventSizeThresholdL1) {
- ShuffleServerMetrics.counterEventSizeThresholdLevel1.inc();
- } else if (metrics.getEventSize() < eventSizeThresholdL2) {
- ShuffleServerMetrics.counterEventSizeThresholdLevel2.inc();
- } else if (metrics.getEventSize() < eventSizeThresholdL3) {
- ShuffleServerMetrics.counterEventSizeThresholdLevel3.inc();
- } else {
- ShuffleServerMetrics.counterEventSizeThresholdLevel4.inc();
+ // the metrics update shouldn't block normal process
+ // log the exception if error happen
+ try {
+ // update shuffle server metrics, these metrics belong to server module
+ // we can't update them in storage module
+ StorageWriteMetrics metrics = createStorageWriteMetrics(event,
writeTime);
+ ShuffleServerMetrics.counterTotalWriteTime.inc(metrics.getWriteTime());
+ ShuffleServerMetrics.counterWriteTotal.inc();
+ if (metrics.getWriteTime() > writeSlowThreshold) {
+ ShuffleServerMetrics.counterWriteSlow.inc();
+ }
+
ShuffleServerMetrics.counterTotalWriteDataSize.inc(metrics.getEventSize());
+
ShuffleServerMetrics.counterTotalWriteBlockSize.inc(metrics.getWriteBlocks());
+ if (metrics.getEventSize() < eventSizeThresholdL1) {
+ ShuffleServerMetrics.counterEventSizeThresholdLevel1.inc();
+ } else if (metrics.getEventSize() < eventSizeThresholdL2) {
+ ShuffleServerMetrics.counterEventSizeThresholdLevel2.inc();
+ } else if (metrics.getEventSize() < eventSizeThresholdL3) {
+ ShuffleServerMetrics.counterEventSizeThresholdLevel3.inc();
+ } else {
+ ShuffleServerMetrics.counterEventSizeThresholdLevel4.inc();
+ }
+ Storage storage = selectStorage(event);
+ if (storage != null) {
+ storage.updateWriteMetrics(metrics);
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception happened when update write metrics for " + event, e);
}
- Storage storage = selectStorage(event);
- storage.updateWriteMetrics(metrics);
}
public StorageWriteMetrics createStorageWriteMetrics(ShuffleDataFlushEvent
event, long writeTime) {
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index e799d80..5bcdf05 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -29,6 +29,7 @@ import java.util.function.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import io.prometheus.client.Gauge;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -130,6 +131,14 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(storageHost).get(),
0.5);
assertEquals(3.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(storageHost).get(),
0.5);
+
+ // test case for process event whose related app was cleared already
+ assertEquals(0, ShuffleServerMetrics.gaugeWriteHandler.get(), 0.5);
+ ShuffleDataFlushEvent fakeEvent =
+ createShuffleDataFlushEvent("fakeAppId", 1, 1, 1, null);
+ manager.addToFlushQueue(fakeEvent);
+ waitForQueueClear(manager);
+ waitForMetrics(ShuffleServerMetrics.gaugeWriteHandler, 0, 0.5);
}
@Test
@@ -265,6 +274,37 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(0, storage.getHandlerSize());
}
+ private void waitForMetrics(Gauge gauge, double expected, double delta)
throws Exception {
+ int retry = 0;
+ boolean match = false;
+ do {
+ Thread.sleep(500);
+ if (retry > 10) {
+ fail("Unexpected flush process when waitForMetrics");
+ }
+ retry++;
+ try {
+ assertEquals(0, gauge.get(), delta);
+ match = true;
+ } catch(Exception e) {
+ // ignore
+ }
+ } while (!match);
+ }
+
+ private void waitForQueueClear(ShuffleFlushManager manager) throws Exception
{
+ int retry = 0;
+ int size = 0;
+ do {
+ Thread.sleep(500);
+ if (retry > 10) {
+ fail("Unexpected flush process when waitForQueueClear");
+ }
+ retry++;
+ size = manager.getEventNumInFlush();
+ } while (size > 0);
+ }
+
private void waitForFlush(ShuffleFlushManager manager,
String appId, int shuffleId, int expectedBlockNum) throws Exception {
int retry = 0;