This is an automated email from the ASF dual-hosted git repository.
quapaw pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new e6ecc4f3e44 YARN-11188. Only files belong to the first file controller
are removed even if multiple log aggregation file controllers are configured.
Contributed by Szilard Nemeth.
e6ecc4f3e44 is described below
commit e6ecc4f3e4433ae23fd745f6e0c641a019664253
Author: 9uapaw <[email protected]>
AuthorDate: Wed Jun 22 14:40:00 2022 +0200
YARN-11188. Only files belong to the first file controller are removed even
if multiple log aggregation file controllers are configured. Contributed by
Szilard Nemeth.
---
.../AggregatedLogDeletionService.java | 55 ++++++++++++------
.../TestAggregatedLogDeletionService.java | 67 ++++++++++++++++++++--
.../AggregatedLogDeletionServiceForTest.java | 13 +++--
.../testutils/LogAggregationTestcase.java | 35 +++++++++--
.../testutils/MockRMClientUtils.java | 4 +-
5 files changed, 141 insertions(+), 33 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
index eb6466a3a02..4f10b2fd4ce 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.logaggregation;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@@ -57,7 +59,7 @@ public class AggregatedLogDeletionService extends
AbstractService {
private Timer timer = null;
private long checkIntervalMsecs;
- private LogDeletionTask task;
+ private List<LogDeletionTask> tasks;
public static class LogDeletionTask extends TimerTask {
private Configuration conf;
@@ -66,14 +68,12 @@ public class AggregatedLogDeletionService extends
AbstractService {
private Path remoteRootLogDir = null;
private ApplicationClientProtocol rmClient = null;
- public LogDeletionTask(Configuration conf, long retentionSecs,
ApplicationClientProtocol rmClient) {
+ public LogDeletionTask(Configuration conf, long retentionSecs,
+ ApplicationClientProtocol rmClient,
+ LogAggregationFileController fileController) {
this.conf = conf;
this.retentionMillis = retentionSecs * 1000;
this.suffix = LogAggregationUtils.getBucketSuffix();
- LogAggregationFileControllerFactory factory =
- new LogAggregationFileControllerFactory(conf);
- LogAggregationFileController fileController =
- factory.getFileControllerForWrite();
this.remoteRootLogDir = fileController.getRemoteRootLogDir();
this.rmClient = rmClient;
}
@@ -220,7 +220,7 @@ public class AggregatedLogDeletionService extends
AbstractService {
@Override
protected void serviceStart() throws Exception {
- scheduleLogDeletionTask();
+ scheduleLogDeletionTasks();
super.serviceStart();
}
@@ -249,13 +249,13 @@ public class AggregatedLogDeletionService extends
AbstractService {
setConfig(conf);
stopRMClient();
stopTimer();
- scheduleLogDeletionTask();
+ scheduleLogDeletionTasks();
} else {
LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log
Deletion Service is not started");
}
}
- private void scheduleLogDeletionTask() throws IOException {
+ private void scheduleLogDeletionTasks() throws IOException {
Configuration conf = getConfig();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
@@ -271,9 +271,28 @@ public class AggregatedLogDeletionService extends
AbstractService {
return;
}
setLogAggCheckIntervalMsecs(retentionSecs);
- task = new LogDeletionTask(conf, retentionSecs, createRMClient());
- timer = new Timer();
- timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
+
+ tasks = createLogDeletionTasks(conf, retentionSecs, createRMClient());
+ for (LogDeletionTask task : tasks) {
+ timer = new Timer();
+ timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
+ }
+ }
+
+ @VisibleForTesting
+ public List<LogDeletionTask> createLogDeletionTasks(Configuration conf, long
retentionSecs,
+
ApplicationClientProtocol rmClient)
+ throws IOException {
+ List<LogDeletionTask> tasks = new ArrayList<>();
+ LogAggregationFileControllerFactory factory = new
LogAggregationFileControllerFactory(conf);
+ List<LogAggregationFileController> fileControllers =
+ factory.getConfiguredLogAggregationFileControllerList();
+ for (LogAggregationFileController fileController : fileControllers) {
+ LogDeletionTask task = new LogDeletionTask(conf, retentionSecs, rmClient,
+ fileController);
+ tasks.add(task);
+ }
+ return tasks;
}
private void stopTimer() {
@@ -295,14 +314,18 @@ public class AggregatedLogDeletionService extends
AbstractService {
// as @Idempotent, it will automatically take care of RM restart/failover.
@VisibleForTesting
protected ApplicationClientProtocol createRMClient() throws IOException {
- return ClientRMProxy.createRMProxy(getConfig(),
- ApplicationClientProtocol.class);
+ return ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
}
@VisibleForTesting
protected void stopRMClient() {
- if (task != null && task.getRMClient() != null) {
- RPC.stopProxy(task.getRMClient());
+ for (LogDeletionTask task : tasks) {
+ if (task != null && task.getRMClient() != null) {
+ RPC.stopProxy(task.getRMClient());
+ //The RMClient instance is the same for all deletion tasks.
+ //It is enough to close the RM client once
+ break;
+ }
}
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
index 13a9afa84e0..285ac43322a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
import java.util.List;
import static
org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
+import static
org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.enableFileControllers;
import static
org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.NO_TIMEOUT;
import static org.mockito.Mockito.mock;
@@ -118,12 +119,12 @@ public class TestAggregatedLogDeletionService {
.withRunningApps(4)
.injectExceptionForAppDirDeletion(3)
.build()
- .setupAndRunDeletionService()
+ .startDeletionService()
.verifyAppDirsDeleted(timeout, 1, 3)
.verifyAppDirsNotDeleted(timeout, 2, 4)
.verifyAppFileDeleted(4, 1, timeout)
.verifyAppFileNotDeleted(4, 2, timeout)
- .teardown();
+ .teardown(1);
}
@Test
@@ -155,7 +156,7 @@ public class TestAggregatedLogDeletionService {
.build();
testcase
- .setupAndRunDeletionService()
+ .startDeletionService()
//app1Dir would be deleted since it is done above log retention
period
.verifyAppDirDeleted(1, 10000L)
//app2Dir is not expected to be deleted since it is below the
threshold
@@ -176,7 +177,8 @@ public class TestAggregatedLogDeletionService {
.verifyCheckIntervalMilliSecondsEqualTo(checkIntervalMilliSeconds)
//app2Dir should be deleted since it falls above the threshold
.verifyAppDirDeleted(2, 10000L)
- .teardown();
+ //Close expected 2 times: once for refresh and once for stopping
+ .teardown(2);
}
@Test
@@ -202,7 +204,7 @@ public class TestAggregatedLogDeletionService {
.withFinishedApps(1)
.withRunningApps()
.build()
- .setupAndRunDeletionService()
+ .startDeletionService()
.verifyAnyPathListedAtLeast(4, 10000L)
.verifyAppDirNotDeleted(1, NO_TIMEOUT)
// modify the timestamp of the logs and verify if it is picked up
quickly
@@ -211,7 +213,7 @@ public class TestAggregatedLogDeletionService {
.changeModTimeOfBucketDir(toDeleteTime)
.reinitAllPaths()
.verifyAppDirDeleted(1, 10000L)
- .teardown();
+ .teardown(1);
}
@Test
@@ -241,6 +243,59 @@ public class TestAggregatedLogDeletionService {
.verifyAppDirDeleted(3, NO_TIMEOUT);
}
+ @Test
+ public void testDeletionTwoControllers() throws IOException {
+ long now = System.currentTimeMillis();
+ long toDeleteTime = now - (2000 * 1000);
+ long toKeepTime = now - (1500 * 1000);
+
+
+ Configuration conf = setupConfiguration(1800, -1);
+ enableFileControllers(conf, REMOTE_ROOT_LOG_DIR, ALL_FILE_CONTROLLERS,
+ ALL_FILE_CONTROLLER_NAMES);
+ long timeout = 2000L;
+ LogAggregationTestcaseBuilder.create(conf)
+ .withRootPath(ROOT)
+ .withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
+ .withBothFileControllers()
+ .withUserDir(USER_ME, toKeepTime)
+ .withSuffixDir(SUFFIX, toDeleteTime)
+ .withBucketDir(toDeleteTime)
+ .withApps(//Apps for TFile
+ Lists.newArrayList(
+ new AppDescriptor(T_FILE, toDeleteTime,
Lists.newArrayList()),
+ new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
+ Pair.of(DIR_HOST1, toDeleteTime),
+ Pair.of(DIR_HOST2, toKeepTime))),
+ new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
+ Pair.of(DIR_HOST1, toDeleteTime),
+ Pair.of(DIR_HOST2, toDeleteTime))),
+ new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
+ Pair.of(DIR_HOST1, toDeleteTime),
+ Pair.of(DIR_HOST2, toKeepTime))),
+ //Apps for IFile
+ new AppDescriptor(I_FILE, toDeleteTime,
Lists.newArrayList()),
+ new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
+ Pair.of(DIR_HOST1, toDeleteTime),
+ Pair.of(DIR_HOST2, toKeepTime))),
+ new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
+ Pair.of(DIR_HOST1, toDeleteTime),
+ Pair.of(DIR_HOST2, toDeleteTime))),
+ new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
+ Pair.of(DIR_HOST1, toDeleteTime),
+ Pair.of(DIR_HOST2, toKeepTime)))))
+ .withFinishedApps(1, 2, 3, 5, 6, 7)
+ .withRunningApps(4, 8)
+ .injectExceptionForAppDirDeletion(3, 6)
+ .build()
+ .startDeletionService()
+ .verifyAppDirsDeleted(timeout, 1, 3, 5, 7)
+ .verifyAppDirsNotDeleted(timeout, 2, 4, 6, 8)
+ .verifyAppFilesDeleted(timeout, Lists.newArrayList(Pair.of(4, 1),
Pair.of(8, 1)))
+ .verifyAppFilesNotDeleted(timeout, Lists.newArrayList(Pair.of(4,
2), Pair.of(8, 2)))
+ .teardown(1);
+ }
+
static class MockFileSystem extends FilterFileSystem {
MockFileSystem() {
super(mock(FileSystem.class));
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java
index 49042cf458b..76ec8aab537 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java
@@ -32,6 +32,7 @@ public class AggregatedLogDeletionServiceForTest extends
AggregatedLogDeletionSe
private final List<ApplicationId> finishedApplications;
private final List<ApplicationId> runningApplications;
private final Configuration conf;
+ private ApplicationClientProtocol mockRMClient;
public AggregatedLogDeletionServiceForTest(List<ApplicationId>
runningApplications,
List<ApplicationId>
finishedApplications) {
@@ -48,11 +49,16 @@ public class AggregatedLogDeletionServiceForTest extends
AggregatedLogDeletionSe
@Override
protected ApplicationClientProtocol createRMClient() throws IOException {
+ if (mockRMClient != null) {
+ return mockRMClient;
+ }
try {
- return createMockRMClient(finishedApplications, runningApplications);
+ mockRMClient =
+ createMockRMClient(finishedApplications, runningApplications);
} catch (Exception e) {
throw new IOException(e);
}
+ return mockRMClient;
}
@Override
@@ -60,8 +66,7 @@ public class AggregatedLogDeletionServiceForTest extends
AggregatedLogDeletionSe
return conf;
}
- @Override
- protected void stopRMClient() {
- // DO NOTHING
+ public ApplicationClientProtocol getMockRMClient() {
+ return mockRMClient;
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java
index 8f535d40714..f2074f8c8e6 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
+import
org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService.LogDeletionTask;
import
org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.AppDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -77,6 +79,7 @@ public class LogAggregationTestcase {
private List<PathWithFileStatus> appDirs;
private final List<AppDescriptor> appDescriptors;
private AggregatedLogDeletionServiceForTest deletionService;
+ private ApplicationClientProtocol rmClient;
public LogAggregationTestcase(LogAggregationTestcaseBuilder builder) throws
IOException {
conf = builder.conf;
@@ -102,6 +105,8 @@ public class LogAggregationTestcase {
mockFs = ((FilterFileSystem) builder.rootFs).getRawFileSystem();
validateAppControllers();
setupMocks();
+
+ setupDeletionService();
}
private void validateAppControllers() {
@@ -241,10 +246,13 @@ public class LogAggregationTestcase {
when(mockFs.listStatus(dir.path)).thenReturn(fileStatuses);
}
- public LogAggregationTestcase setupAndRunDeletionService() {
+ private void setupDeletionService() {
List<ApplicationId> finishedApps = createFinishedAppsList();
List<ApplicationId> runningApps = createRunningAppsList();
deletionService = new AggregatedLogDeletionServiceForTest(runningApps,
finishedApps, conf);
+ }
+
+ public LogAggregationTestcase startDeletionService() {
deletionService.init(conf);
deletionService.start();
return this;
@@ -271,10 +279,13 @@ public class LogAggregationTestcase {
public LogAggregationTestcase runDeletionTask(long retentionSeconds) throws
Exception {
List<ApplicationId> finishedApps = createFinishedAppsList();
List<ApplicationId> runningApps = createRunningAppsList();
- ApplicationClientProtocol rmClient = createMockRMClient(finishedApps,
runningApps);
- AggregatedLogDeletionService.LogDeletionTask deletionTask =
- new AggregatedLogDeletionService.LogDeletionTask(conf,
retentionSeconds, rmClient);
- deletionTask.run();
+ rmClient = createMockRMClient(finishedApps, runningApps);
+ List<LogDeletionTask> tasks = deletionService.createLogDeletionTasks(conf,
retentionSeconds,
+ rmClient);
+ for (LogDeletionTask deletionTask : tasks) {
+ deletionTask.run();
+ }
+
return this;
}
@@ -359,8 +370,20 @@ public class LogAggregationTestcase {
verify(mockFs, timeout(timeout).times(times)).delete(file.path, true);
}
- public void teardown() {
+ private void verifyMockRmClientWasClosedNTimes(int expectedRmClientCloses)
+ throws IOException {
+ ApplicationClientProtocol mockRMClient;
+ if (deletionService != null) {
+ mockRMClient = deletionService.getMockRMClient();
+ } else {
+ mockRMClient = rmClient;
+ }
+ verify((Closeable)mockRMClient, times(expectedRmClientCloses)).close();
+ }
+
+ public void teardown(int expectedRmClientCloses) throws IOException {
deletionService.stop();
+ verifyMockRmClientWasClosedNTimes(expectedRmClientCloses);
}
public LogAggregationTestcase refreshLogRetentionSettings() throws
IOException {
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java
index c3f69c2a67d..6eb1eb1ecbe 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.logaggregation.testutils;
+import org.apache.hadoop.test.MockitoUtil;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@@ -34,7 +35,8 @@ public class MockRMClientUtils {
public static ApplicationClientProtocol createMockRMClient(
List<ApplicationId> finishedApplications,
List<ApplicationId> runningApplications) throws Exception {
- final ApplicationClientProtocol mockProtocol =
mock(ApplicationClientProtocol.class);
+ final ApplicationClientProtocol mockProtocol =
+ MockitoUtil.mockProtocol(ApplicationClientProtocol.class);
if (finishedApplications != null && !finishedApplications.isEmpty()) {
for (ApplicationId appId : finishedApplications) {
GetApplicationReportRequest request =
GetApplicationReportRequest.newInstance(appId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]