This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0f9d9bcb5 [#1972] fix(server): Fix clear leak shuffle data
accidentally remove data of new coming appId issue (#1971)
0f9d9bcb5 is described below
commit 0f9d9bcb5648c95051c9f8c4831943f126d463b9
Author: maobaolong <[email protected]>
AuthorDate: Thu Aug 29 17:33:48 2024 +0800
[#1972] fix(server): Fix clear leak shuffle data accidentally remove data
of new coming appId issue (#1971)
### What changes were proposed in this pull request?
Fix clear leak shuffle data accidentally remove data of new coming appId
issue
### Why are the changes needed?
Fix: #1972
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No need.
---
.../uniffle/common/log/TestLoggerExtension.java | 141 +++++++++++++++++++++
.../common/log/TestLoggerParamResolver.java | 40 ++++++
.../apache/uniffle/server/ShuffleTaskManager.java | 4 +-
.../server/storage/HadoopStorageManager.java | 3 +-
.../server/storage/HybridStorageManager.java | 5 +-
.../server/storage/LocalStorageManager.java | 4 +-
.../uniffle/server/storage/StorageManager.java | 3 +-
.../server/storage/LocalStorageManagerTest.java | 46 +++++++
8 files changed, 239 insertions(+), 7 deletions(-)
diff --git
a/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java
b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java
new file mode 100644
index 000000000..df0e815eb
--- /dev/null
+++
b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.log;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.concurrent.GuardedBy;
+
+import org.apache.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class TestLoggerExtension implements BeforeEachCallback,
AfterEachCallback {
+ private static final ExtensionContext.Namespace NAMESPACE =
+ ExtensionContext.Namespace.create(TestLoggerExtension.class);
+ private static final String LOG_COLLECTOR_KEY = "TestLogAppender";
+ private TestAppender appender;
+
+ @Override
+ public void beforeEach(ExtensionContext context) {
+ appender = new TestAppender();
+ if (LogManager.getLogger(LogManager.ROOT_LOGGER_NAME) instanceof Logger) {
+ org.apache.logging.log4j.core.Logger log4jlogger =
+ (org.apache.logging.log4j.core.Logger)
+
org.apache.logging.log4j.LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
+ appender.start();
+ log4jlogger.addAppender(appender);
+ }
+ context.getStore(NAMESPACE).put(LOG_COLLECTOR_KEY, this);
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) {
+ if (LogManager.getLogger(LogManager.ROOT_LOGGER_NAME) instanceof Logger) {
+ Logger log4jlogger = (Logger)
LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
+ appender.stop();
+ log4jlogger.removeAppender(appender);
+ }
+ }
+
+ public static TestLoggerExtension getTestLogger(ExtensionContext context) {
+ return context.getStore(NAMESPACE).get(LOG_COLLECTOR_KEY,
TestLoggerExtension.class);
+ }
+
+ /**
+ * Determine if a specific pattern appears in log output.
+ *
+ * @param pattern a pattern text to search for in log events
+ * @return true if a log message containing the pattern exists, false
otherwise
+ */
+ public boolean wasLogged(String pattern) {
+ return appender.wasLogged(Pattern.compile(".*" + pattern + ".*"));
+ }
+
+ /**
+ * Determine if a specific pattern appears in log output with the specified
level.
+ *
+ * @param pattern a pattern text to search for in log events
+ * @return true if a log message containing the pattern exists, false
otherwise
+ */
+ public boolean wasLoggedWithLevel(String pattern, Level level) {
+ return appender.wasLoggedWithLevel(Pattern.compile(".*" + pattern + ".*"),
level);
+ }
+
+ /**
+ * Count the number of times a specific pattern appears in log messages.
+ *
+ * @param pattern Pattern to search for in log events
+ * @return The number of log messages which match the pattern
+ */
+ public int logCount(String pattern) {
+ // [\s\S] will match all character include line break
+ return appender.logCount(Pattern.compile("[\\s\\S]*" + pattern +
"[\\s\\S]*"));
+ }
+
+ public class TestAppender extends AbstractAppender {
+ @GuardedBy("this") private final List<LogEvent> events = new ArrayList<>();
+
+ protected TestAppender() {
+ super("", null, null, false, null);
+ }
+
+ /** Determines whether a message with the given pattern was logged. */
+ public synchronized boolean wasLogged(Pattern pattern) {
+ for (LogEvent e : events) {
+ if (pattern.matcher(e.getMessage().getFormattedMessage()).matches()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** Determines whether a message with the given pattern was logged. */
+ public synchronized boolean wasLoggedWithLevel(Pattern pattern, Level
level) {
+ for (LogEvent e : events) {
+ if (e.getLevel().equals(level)
+ &&
pattern.matcher(e.getMessage().getFormattedMessage()).matches()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** Counts the number of log message with a given pattern. */
+ public synchronized int logCount(Pattern pattern) {
+ int logCount = 0;
+ for (LogEvent e : events) {
+ if (pattern.matcher(e.getMessage().getFormattedMessage()).matches()) {
+ logCount++;
+ }
+ }
+ return logCount;
+ }
+
+ @Override
+ public void append(LogEvent event) {
+ events.add(event);
+ }
+ }
+}
diff --git
a/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java
b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java
new file mode 100644
index 000000000..10fba0021
--- /dev/null
+++
b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.log;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+public class TestLoggerParamResolver implements ParameterResolver {
+ @Override
+ public boolean supportsParameter(
+ final ParameterContext parameterContext, final ExtensionContext
extensionContext)
+ throws ParameterResolutionException {
+ return
ExtensionContext.class.isAssignableFrom(parameterContext.getParameter().getType())
+ && parameterContext.getIndex() == 0;
+ }
+
+ @Override
+ public Object resolveParameter(
+ final ParameterContext parameterContext, final ExtensionContext
extensionContext)
+ throws ParameterResolutionException {
+ return extensionContext;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 226682e63..b48258423 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -832,8 +832,8 @@ public class ShuffleTaskManager {
public void checkLeakShuffleData() {
LOG.info("Start check leak shuffle data");
try {
- Set<String> appIds = Sets.newHashSet(shuffleTaskInfos.keySet());
- storageManager.checkAndClearLeakedShuffleData(appIds);
+ storageManager.checkAndClearLeakedShuffleData(
+ () -> Sets.newHashSet(shuffleTaskInfos.keySet()));
LOG.info("Finish check leak shuffle data");
} catch (Exception e) {
LOG.warn("Error happened in checkLeakShuffleData", e);
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 33d9b820b..99c055e5f 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -181,7 +182,7 @@ public class HadoopStorageManager extends
SingleStorageManager {
}
@Override
- public void checkAndClearLeakedShuffleData(Collection<String> appIds) {}
+ public void checkAndClearLeakedShuffleData(Supplier<Collection<String>>
appIdsSupplier) {}
@Override
public Map<String, StorageInfo> getStorageInfo() {
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
index c1169f42f..05b83e558 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.server.storage;
import java.lang.reflect.Constructor;
import java.util.Collection;
import java.util.Map;
+import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,8 +146,8 @@ public class HybridStorageManager implements StorageManager
{
}
@Override
- public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
- warmStorageManager.checkAndClearLeakedShuffleData(appIds);
+ public void checkAndClearLeakedShuffleData(Supplier<Collection<String>>
appIdsSupplier) {
+ warmStorageManager.checkAndClearLeakedShuffleData(appIdsSupplier);
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 10f831ad1..6be2af740 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -375,7 +376,7 @@ public class LocalStorageManager extends
SingleStorageManager {
}
@Override
- public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
+ public void checkAndClearLeakedShuffleData(Supplier<Collection<String>>
appIdsSupplier) {
Set<String> appIdsOnStorages = new HashSet<>();
for (LocalStorage localStorage : localStorages) {
if (!localStorage.isCorrupted()) {
@@ -384,6 +385,7 @@ public class LocalStorageManager extends
SingleStorageManager {
}
}
+ Collection<String> appIds = appIdsSupplier.get();
for (String appId : appIdsOnStorages) {
if (!appIds.contains(appId)) {
ShuffleDeleteHandler deleteHandler =
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index 402edc2cd..70425a22d 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server.storage;
import java.util.Collection;
import java.util.Map;
+import java.util.function.Supplier;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.storage.StorageInfo;
@@ -55,7 +56,7 @@ public interface StorageManager {
// todo: add an interface that check storage isHealthy
- void checkAndClearLeakedShuffleData(Collection<String> appIds);
+ void checkAndClearLeakedShuffleData(Supplier<Collection<String>>
appIdsSupplier);
/**
* Report a map of storage mount point -> storage info mapping. For local
storages, the mount
diff --git
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
index 5584e2609..63c5c12dd 100644
---
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -24,6 +24,7 @@ import java.io.InputStreamReader;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -32,29 +33,40 @@ import org.apache.commons.lang3.SystemUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.ExtensionContext;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.log.TestLoggerExtension;
+import org.apache.uniffle.common.log.TestLoggerParamResolver;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.ShuffleTaskInfo;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static
uk.org.webcompere.systemstubs.SystemStubs.withEnvironmentVariables;
/** The class is to test the {@link LocalStorageManager} */
+@ExtendWith(TestLoggerExtension.class)
+@ExtendWith(TestLoggerParamResolver.class)
public class LocalStorageManagerTest {
@BeforeAll
@@ -332,4 +344,38 @@ public class LocalStorageManagerTest {
assertEquals(StorageMedia.SSD,
storageInfo.get(mountPoint).getType());
});
}
+
+ @Test
+ public void testNewAppWhileCheckLeak(ExtensionContext context) {
+ String[] storagePaths = {"/tmp/rss-data1"};
+
+ ShuffleServerConf conf = new ShuffleServerConf();
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(storagePaths));
+ conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
+ conf.setString(
+ ShuffleServerConf.RSS_STORAGE_TYPE.key(),
+ org.apache.uniffle.storage.util.StorageType.LOCALFILE.name());
+ LocalStorageManager localStorageManager = new LocalStorageManager(conf);
+
+ List<LocalStorage> storages = localStorageManager.getStorages();
+ assertNotNull(storages);
+
+ // test normal case
+ Map<String, ShuffleTaskInfo> shuffleTaskInfos =
JavaUtils.newConcurrentMap();
+ shuffleTaskInfos.put("app0", new ShuffleTaskInfo("app0"));
+ shuffleTaskInfos.put("app1", new ShuffleTaskInfo("app1"));
+ shuffleTaskInfos.put("app2", new ShuffleTaskInfo("app2"));
+
localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet);
+ TestLoggerExtension testLogger =
TestLoggerExtension.getTestLogger(context);
+ assertFalse(testLogger.wasLogged("app"));
+
+ // test race condition case, app 3 is new app
+ shuffleTaskInfos.put("3", new ShuffleTaskInfo("app3"));
+ LocalStorage mockLocalStorage = mock(LocalStorage.class);
+
when(mockLocalStorage.getAppIds()).thenReturn(Collections.singleton("app3"));
+ storages.add(mockLocalStorage);
+
localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet);
+ assertTrue(testLogger.wasLogged("Delete shuffle data for
appId\\[app3\\]"));
+ System.out.println();
+ }
}