This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 62e44761020 YARN-10122. Support signalToContainer API for Federation.
(#4421)
62e44761020 is described below
commit 62e447610208919a00ecdf8eb99ad498689bbb05
Author: slfan1989 <[email protected]>
AuthorDate: Fri Jun 17 16:38:36 2022 -0700
YARN-10122. Support signalToContainer API for Federation. (#4421)
---
.../hadoop/yarn/server/router/RouterMetrics.java | 33 ++++++++++++++++
.../clientrm/FederationClientInterceptor.java | 38 +++++++++++++++++-
.../yarn/server/router/TestRouterMetrics.java | 33 ++++++++++++++++
.../clientrm/TestFederationClientInterceptor.java | 45 ++++++++++++++++++++++
.../TestableFederationClientInterceptor.java | 11 +++++-
5 files changed, 158 insertions(+), 2 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index b02b3e155fa..ac37c4ed1b9 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -81,6 +81,8 @@ public final class RouterMetrics {
private MutableGaugeInt numUpdateAppPriorityFailedRetrieved;
@Metric("# of updateApplicationPriority failed to be retrieved")
private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved;
+ @Metric("# of signalToContainer failed to be retrieved")
+ private MutableGaugeInt numSignalToContainerFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@@ -126,6 +128,8 @@ public final class RouterMetrics {
private MutableRate totalSucceededUpdateAppPriorityRetrieved;
@Metric("Total number of successful Retrieved updateApplicationTimeouts and
latency(ms)")
private MutableRate totalSucceededUpdateAppTimeoutsRetrieved;
+ @Metric("Total number of successful Retrieved signalToContainer and
latency(ms)")
+ private MutableRate totalSucceededSignalToContainerRetrieved;
/**
* Provide quantile counters for all latencies.
@@ -150,6 +154,7 @@ public final class RouterMetrics {
private MutableQuantiles failAppAttemptLatency;
private MutableQuantiles updateAppPriorityLatency;
private MutableQuantiles updateAppTimeoutsLatency;
+ private MutableQuantiles signalToContainerLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@@ -228,6 +233,10 @@ public final class RouterMetrics {
updateAppTimeoutsLatency =
registry.newQuantiles("updateApplicationTimeoutsLatency",
"latency of update application timeouts", "ops", "latency", 10);
+
+ signalToContainerLatency =
+ registry.newQuantiles("signalToContainerLatency",
+ "latency of signal to container timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@@ -349,6 +358,11 @@ public final class RouterMetrics {
return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples();
}
+ @VisibleForTesting
+ public long getNumSucceededSignalToContainerRetrieved() {
+ return totalSucceededSignalToContainerRetrieved.lastStat().numSamples();
+ }
+
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@@ -449,6 +463,11 @@ public final class RouterMetrics {
return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean();
}
+ @VisibleForTesting
+ public double getLatencySucceededSignalToContainerRetrieved() {
+ return totalSucceededSignalToContainerRetrieved.lastStat().mean();
+ }
+
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@@ -549,6 +568,11 @@ public final class RouterMetrics {
return numUpdateAppTimeoutsFailedRetrieved.value();
}
+ @VisibleForTesting
+ public int getSignalToContainerFailedRetrieved() {
+ return numSignalToContainerFailedRetrieved.value();
+ }
+
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@@ -649,6 +673,11 @@ public final class RouterMetrics {
updateAppTimeoutsLatency.add(duration);
}
+ public void succeededSignalToContainerRetrieved(long duration) {
+ totalSucceededSignalToContainerRetrieved.add(duration);
+ signalToContainerLatency.add(duration);
+ }
+
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@@ -728,4 +757,8 @@ public final class RouterMetrics {
public void incrUpdateApplicationTimeoutsRetrieved() {
numUpdateAppTimeoutsFailedRetrieved.incr();
}
+
+ public void incrSignalToContainerFailedRetrieved() {
+ numSignalToContainerFailedRetrieved.incr();
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index fec62d4b080..6cc317242cd 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -1304,7 +1304,43 @@ public class FederationClientInterceptor
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ if (request == null || request.getContainerId() == null
+ || request.getCommand() == null) {
+ routerMetrics.incrSignalToContainerFailedRetrieved();
+ RouterServerUtil.logAndThrowException(
+ "Missing signalToContainer request or containerId " +
+ "or command information.", null);
+ }
+
+ long startTime = clock.getTime();
+ SubClusterId subClusterId = null;
+ ApplicationId applicationId =
+ request.getContainerId().getApplicationAttemptId().getApplicationId();
+ try {
+ subClusterId = getApplicationHomeSubCluster(applicationId);
+ } catch (YarnException ex) {
+ routerMetrics.incrSignalToContainerFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Application " + applicationId +
+ " does not exist in FederationStateStore.", ex);
+ }
+
+ ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
+ SignalContainerResponse response = null;
+ try {
+ response = clientRMProxy.signalToContainer(request);
+ } catch (Exception ex) {
+ RouterServerUtil.logAndThrowException("Unable to signal to container for
" +
+ applicationId + " from SubCluster " + subClusterId.getId(), ex);
+ }
+
+ if (response == null) {
+ LOG.error("No response when signal to container of " +
+ "the applicationId {} to SubCluster {}.", applicationId,
subClusterId.getId());
+ }
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededSignalToContainerRetrieved(stopTime - startTime);
+ return response;
}
@Override
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index 4b1049e8b64..eddd2a0ab48 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -413,6 +413,11 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed updateApplicationTimeouts call");
metrics.incrUpdateApplicationTimeoutsRetrieved();
}
+
+ public void getSignalContainer() {
+ LOG.info("Mocked: failed signalContainer call");
+ metrics.incrSignalToContainerFailedRetrieved();
+ }
}
// Records successes for all calls
@@ -523,6 +528,11 @@ public class TestRouterMetrics {
LOG.info("Mocked: successful updateApplicationTimeouts call with
duration {}", duration);
metrics.succeededUpdateAppTimeoutsRetrieved(duration);
}
+
+ public void getSignalToContainerTimeouts(long duration) {
+ LOG.info("Mocked: successful signalToContainer call with duration {}",
duration);
+ metrics.succeededSignalToContainerRetrieved(duration);
+ }
}
@Test
@@ -806,4 +816,27 @@ public class TestRouterMetrics {
metrics.getUpdateApplicationTimeoutsFailedRetrieved());
}
+ @Test
+ public void testSucceededSignalToContainerRetrieved() {
+ long totalGoodBefore = metrics.getNumSucceededSignalToContainerRetrieved();
+ goodSubCluster.getSignalToContainerTimeouts(150);
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededSignalToContainerRetrieved());
+ Assert.assertEquals(150,
+ metrics.getLatencySucceededSignalToContainerRetrieved(),
ASSERT_DOUBLE_DELTA);
+ goodSubCluster.getSignalToContainerTimeouts(300);
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededSignalToContainerRetrieved());
+ Assert.assertEquals(225,
+ metrics.getLatencySucceededSignalToContainerRetrieved(),
ASSERT_DOUBLE_DELTA);
+ }
+
+ @Test
+ public void testSignalToContainerFailed() {
+ long totalBadBefore = metrics.getSignalToContainerFailedRetrieved();
+ badSubCluster.getSignalContainer();
+ Assert.assertEquals(totalBadBefore + 1,
+ metrics.getSignalToContainerFailedRetrieved());
+ }
+
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index 9ead9fbe721..30377382402 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -72,6 +72,8 @@ import
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityReque
import
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -83,6 +85,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import
org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@@ -91,6 +94,7 @@ import
org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -1056,4 +1060,45 @@ public class TestFederationClientInterceptor extends
BaseRouterClientRMTest {
Assert.assertNotNull(timeoutsResponse);
Assert.assertEquals(appTimeout, responseTimeOut);
}
+
+ @Test
+ public void testSignalContainer() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Signal Container request.");
+
+ // null request
+ LambdaTestUtils.intercept(YarnException.class, "Missing signalToContainer
request " +
+ "or containerId or command information.", () ->
interceptor.signalToContainer(null));
+
+ // normal request
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+ // Submit the application
+ SubmitApplicationResponse response =
interceptor.submitApplication(request);
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ SubClusterId subClusterId =
interceptor.getApplicationHomeSubCluster(appId);
+ Assert.assertNotNull(subClusterId);
+
+ MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
+ mockRM.waitForState(appId, RMAppState.ACCEPTED);
+ RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
+ mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptState.SCHEDULED);
+ MockNM nm = interceptor.getMockNMs().get(subClusterId);
+ nm.nodeHeartbeat(true);
+ mockRM.waitForState(rmApp.getCurrentAppAttempt(),
RMAppAttemptState.ALLOCATED);
+ mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
+
+ ContainerId containerId =
rmApp.getCurrentAppAttempt().getMasterContainer().getId();
+
+ SignalContainerRequest signalContainerRequest =
+ SignalContainerRequest.newInstance(containerId,
SignalContainerCommand.GRACEFUL_SHUTDOWN);
+ SignalContainerResponse signalContainerResponse =
+ interceptor.signalToContainer(signalContainerRequest);
+
+ Assert.assertNotNull(signalContainerResponse);
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
index 202a286696a..af1f45924c1 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -51,6 +52,9 @@ public class TestableFederationClientInterceptor
private ConcurrentHashMap<SubClusterId, MockRM> mockRMs =
new ConcurrentHashMap<>();
+ private ConcurrentHashMap<SubClusterId, MockNM> mockNMs =
+ new ConcurrentHashMap<>();
+
private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
@Override
@@ -71,7 +75,8 @@ public class TestableFederationClientInterceptor
mockRM.init(super.getConf());
mockRM.start();
try {
- mockRM.registerNode("h1:1234", 1024);
+ MockNM nm = mockRM.registerNode("127.0.0.1:1234", 8*1024, 4);
+ mockNMs.put(subClusterId, nm);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
@@ -118,4 +123,8 @@ public class TestableFederationClientInterceptor
public ConcurrentHashMap<SubClusterId, MockRM> getMockRMs() {
return mockRMs;
}
+
+ public ConcurrentHashMap<SubClusterId, MockNM> getMockNMs() {
+ return mockNMs;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]