This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2d20d88d2 [#2073] fix(server): Throws EventRetryException when error
happens on creating write handler (#2074)
2d20d88d2 is described below
commit 2d20d88d2da5ce57dce389d37cd54790be64bf82
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Sep 2 17:37:14 2024 +0800
[#2073] fix(server): Throws EventRetryException when error happens on
creating write handler (#2074)
### What changes were proposed in this pull request?
Throws EventRetryException If Exception is encountered during create write
handler.
### Why are the changes needed?
Fix: #2073
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI
---
.../uniffle/server/DefaultFlushEventHandler.java | 4 +-
.../apache/uniffle/server/ShuffleFlushManager.java | 9 ++-
.../uniffle/server/ShuffleFlushManagerTest.java | 64 ++++++++++++++++++++++
3 files changed, 74 insertions(+), 3 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index e0a47526f..024fc6b5d 100644
---
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -122,9 +122,9 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
return;
}
+ ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+ ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
if (e instanceof EventDiscardException) {
- ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
- ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
if (storage != null) {
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index a80267546..c08c358c0 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -169,7 +169,14 @@ public class ShuffleFlushManager {
storageDataReplica,
user,
maxConcurrencyPerPartitionToWrite);
- ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+ ShuffleWriteHandler handler;
+ try {
+ handler = storage.getOrCreateWriteHandler(request);
+ } catch (Exception e) {
+ LOG.warn("Failed to create write handler for event: {}", event, e);
+ throw new EventRetryException(e);
+ }
+
long startTime = System.currentTimeMillis();
boolean writeSuccess = storageManager.write(storage, handler, event);
if (!writeSuccess) {
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 2b21445e3..59e92e320 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -19,8 +19,13 @@ package org.apache.uniffle.server;
import java.io.File;
import java.io.FileNotFoundException;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFileAttributeView;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
@@ -40,6 +45,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -58,9 +64,11 @@ import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.server.storage.HadoopStorageManager;
+import org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy;
import org.apache.uniffle.server.storage.HybridStorageManager;
import org.apache.uniffle.server.storage.LocalStorageManager;
import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
import org.apache.uniffle.storage.HadoopTestBase;
@@ -280,6 +288,62 @@ public class ShuffleFlushManagerTest extends
HadoopTestBase {
waitForMetrics(ShuffleServerMetrics.gaugeWriteHandler, 0, 0.5);
}
+ @Test
+ public void testCreateWriteHandlerFailed(@TempDir File tmpDir) throws
Exception {
+ String appId = "testCreateWriteHandlerFailed_appId";
+ File localStorageDirectory = new File(tmpDir, appId + "_rssdata");
+ localStorageDirectory.mkdirs();
+ EnumSet<PosixFilePermission> perms =
+ EnumSet.of(
+ PosixFilePermission.OWNER_READ,
+ PosixFilePermission.OWNER_EXECUTE,
+ PosixFilePermission.GROUP_EXECUTE,
+ PosixFilePermission.OTHERS_READ,
+ PosixFilePermission.OTHERS_EXECUTE);
+ Files.getFileAttributeView(localStorageDirectory.toPath(),
PosixFileAttributeView.class)
+ .setPermissions(perms);
+
+ ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
+ shuffleServerConf.set(
+ ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
+ RotateStorageManagerFallbackStrategy.class.getCanonicalName());
+ shuffleServerConf.set(
+ ShuffleServerConf.RSS_STORAGE_BASE_PATH,
+ Lists.newArrayList(localStorageDirectory.getAbsolutePath()));
+ shuffleServerConf.setString(
+ shuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE_HDFS.name());
+ ReentrantReadWriteLock rsLock = new ReentrantReadWriteLock();
+ when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId))
+ .thenReturn(rsLock.readLock());
+
+ StorageManager storageManager =
+
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+ storageManager.registerRemoteStorage(appId, remoteStorage);
+ ShuffleFlushManager manager =
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
+ ShuffleDataFlushEvent event1 = createShuffleDataFlushEvent(appId, 1, 1, 1,
null);
+
+ manager.addToFlushQueue(event1);
+ waitForFlush(manager, appId, 1, 5);
+ assertEquals(1, event1.getRetryTimes());
+ List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
+ assertEquals(blocks1.size(), manager.getCommittedBlockIds(appId,
1).getLongCardinality());
+
+ int maxRetryTimes = 5;
+ shuffleServerConf.set(ShuffleServerConf.SERVER_WRITE_RETRY_MAX,
maxRetryTimes);
+ shuffleServerConf.set(
+ ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
+ HadoopStorageManagerFallbackStrategy.class.getCanonicalName());
+ storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+ storageManager.registerRemoteStorage(appId, remoteStorage);
+ manager = new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
+ ShuffleDataFlushEvent event2 = createShuffleDataFlushEvent(appId, 1, 1, 1,
null);
+ manager.addToFlushQueue(event2);
+ Awaitility.await()
+ .timeout(Duration.ofSeconds(20))
+ .until(() -> event2.getRetryTimes() == maxRetryTimes + 1);
+ }
+
@Test
public void localMetricsTest(@TempDir File tempDir) throws Exception {
shuffleServerConf.set(