Repository: reef Updated Branches: refs/heads/master f78355489 -> c030e7366
[REEF-1710] Create a unit test that checks the YARN Unmanaged AM API JIRA: [REEF-1710](https://issues.apache.org/jira/browse/REEF-1710) Pull request: Closes #1224 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c030e736 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c030e736 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c030e736 Branch: refs/heads/master Commit: c030e7366675bb44a735b7ffc5cf5fe5940c449d Parents: f783554 Author: Sergiy Matusevych <[email protected]> Authored: Wed Jan 11 18:17:34 2017 -0800 Committer: Mariia Mykhailova <[email protected]> Committed: Tue Jan 17 15:55:44 2017 -0800 ---------------------------------------------------------------------- .../yarn/driver/unmanaged/UnmanagedAmTest.java | 220 +++++++++++++++++++ .../yarn/driver/unmanaged/package-info.java | 22 ++ 2 files changed, 242 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/c030e736/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java new file mode 100644 index 0000000..130f148 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/UnmanagedAmTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.driver.unmanaged; + +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Test REEF Driver in Unmanaged AM mode on YARN. + */ +public final class UnmanagedAmTest implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler { + + private static final Logger LOG = Logger.getLogger(UnmanagedAmTest.class.getName()); + + @Test + public void testAmShutdown() throws IOException, YarnException { + + Assume.assumeTrue( + "This test requires a YARN Resource Manager to connect to", + Boolean.parseBoolean(System.getenv("REEF_TEST_YARN"))); + + final YarnConfiguration yarnConfig = new YarnConfiguration(); + + // Start YARN client and register the application + + final YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(yarnConfig); + yarnClient.start(); + + final ContainerLaunchContext containerContext = Records.newRecord(ContainerLaunchContext.class); + containerContext.setCommands(Collections.<String>emptyList()); + containerContext.setLocalResources(Collections.<String, LocalResource>emptyMap()); + containerContext.setEnvironment(Collections.<String, String>emptyMap()); + containerContext.setTokens(getTokens()); + + final ApplicationSubmissionContext appContext = yarnClient.createApplication().getApplicationSubmissionContext(); + appContext.setApplicationName("REEF_Unmanaged_AM_Test"); + appContext.setAMContainerSpec(containerContext); + appContext.setUnmanagedAM(true); + appContext.setQueue("default"); + + final ApplicationId applicationId = appContext.getApplicationId(); + LOG.log(Level.INFO, "Registered YARN application: {0}", applicationId); + + yarnClient.submitApplication(appContext); + + LOG.log(Level.INFO, "YARN application submitted: {0}", applicationId); + + addToken(yarnClient.getAMRMToken(applicationId)); + + // Start the AM + + final AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(1000, this); + rmClient.init(yarnConfig); + rmClient.start(); + + final NMClientAsync nmClient = new NMClientAsyncImpl(this); + nmClient.init(yarnConfig); + nmClient.start(); + + final RegisterApplicationMasterResponse registration = + rmClient.registerApplicationMaster(NetUtils.getHostname(), -1, null); + + LOG.log(Level.INFO, "Unmanaged AM is running: {0}", registration); + + rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "Success!", null); + + LOG.log(Level.INFO, "Unregistering AM: state {0}", rmClient.getServiceState()); + + // Shutdown the AM + + rmClient.stop(); + nmClient.stop(); + + // Get the final application report + + final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); + final YarnApplicationState appState = appReport.getYarnApplicationState(); + final FinalApplicationStatus finalAttemptStatus = appReport.getFinalApplicationStatus(); + + LOG.log(Level.INFO, "Application {0} final attempt {1} status: {2}/{3}", new Object[] { + applicationId, appReport.getCurrentApplicationAttemptId(), appState, finalAttemptStatus}); + + Assert.assertEquals("Application must be in FINISHED state", YarnApplicationState.FINISHED, appState); + Assert.assertEquals("Final status must be SUCCEEDED", FinalApplicationStatus.SUCCEEDED, finalAttemptStatus); + + // Shutdown YARN client + + yarnClient.stop(); + } + + private static ByteBuffer getTokens() throws IOException { + + final UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + final Credentials credentials = ugi.getCredentials(); + + try (final DataOutputBuffer dob = new DataOutputBuffer()) { + credentials.writeTokenStorageToStream(dob); + return ByteBuffer.wrap(dob.getData()); + } + } + + private static void addToken(final Token<AMRMTokenIdentifier> token) throws IOException { + final UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + ugi.addToken(token); + } + + @Override + public void onShutdownRequest() { + LOG.log(Level.INFO, "Shutdown requested by YARN"); + } + + // Methods below are dummy implementations of the required callbacks. + + @Override + public void onContainersCompleted(final List<ContainerStatus> list) { + throw new RuntimeException("This method should never be invoked"); + } + + @Override + public void onContainersAllocated(final List<Container> list) { + throw new RuntimeException("This method should never be invoked"); + } + + @Override + public void onNodesUpdated(final List<NodeReport> list) { + throw new RuntimeException("This method should never be invoked"); + } + + @Override + public float getProgress() { + return 0; + } + + @Override + public void onError(final Throwable throwable) { + throw new RuntimeException("This method should never be invoked", throwable); + } + + @Override + public void onContainerStarted(final ContainerId containerId, final Map<String, ByteBuffer> map) { + throw new RuntimeException("This method should never be invoked"); + } + + @Override + public void onContainerStatusReceived(final ContainerId containerId, final ContainerStatus containerStatus) { + throw new RuntimeException("This method should never be invoked"); + } + + @Override + public void onContainerStopped(final ContainerId containerId) { + throw new RuntimeException("This method should never be invoked"); + } + + @Override + public void onStartContainerError(final ContainerId containerId, final Throwable throwable) { + throw new RuntimeException("This method should never be invoked"); + } + + @Override + public void onGetContainerStatusError(final ContainerId containerId, final Throwable throwable) { + throw new RuntimeException("This method should never be invoked"); + } + + @Override + public void onStopContainerError(final ContainerId containerId, final Throwable throwable) { + throw new RuntimeException("This method should never be invoked"); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/c030e736/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java new file mode 100644 index 0000000..048fecf --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/unmanaged/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Unit tests for running REEF Driver in YARN Unmanaged AM mode. + */ +package org.apache.reef.runtime.yarn.driver.unmanaged;
