GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port
YARNHighAvailabilityITCase to new codebase
URL: https://github.com/apache/flink/pull/7509#discussion_r248477826
##########
File path:
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
##########
@@ -18,198 +18,205 @@
package org.apache.flink.yarn;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
+import org.apache.flink.yarn.testjob.YarnTestJob;
+import org.apache.flink.yarn.util.YarnTestUtils;
+
import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.io.File;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
+import javax.annotation.Nonnull;
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeTrue;
/**
* Tests that verify correct HA behavior.
*/
public class YARNHighAvailabilityITCase extends YarnTestBase {
- private static TestingServer zkServer;
-
- private static ActorSystem actorSystem;
+ @ClassRule
+ public static final TemporaryFolder FOLDER = new TemporaryFolder();
- private static final int numberApplicationAttempts = 3;
+ private static final String LOG_DIR = "flink-yarn-tests-ha";
+ private static final Duration TIMEOUT = Duration.ofSeconds(200L);
- @Rule
- public TemporaryFolder temp = new TemporaryFolder();
+ private static TestingServer zkServer;
+ private static String storageDir;
@BeforeClass
- public static void setup() {
- actorSystem = AkkaUtils.createDefaultActorSystem();
-
- try {
- zkServer = new TestingServer();
- zkServer.start();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Could not start ZooKeeper testing
cluster.");
- }
+ public static void setup() throws Exception {
+ zkServer = new TestingServer();
- YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY,
"flink-yarn-tests-ha");
- YARN_CONFIGURATION.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, ""
+ numberApplicationAttempts);
+ storageDir = FOLDER.newFolder().getAbsolutePath();
+ // startYARNWithConfig should be implemented by subclass
+ YARN_CONFIGURATION.setClass(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class, ResourceScheduler.class);
+ YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY,
LOG_DIR);
+ YARN_CONFIGURATION.setInt(YarnConfiguration.NM_PMEM_MB, 4096);
startYARNWithConfig(YARN_CONFIGURATION);
}
@AfterClass
public static void teardown() throws Exception {
if (zkServer != null) {
zkServer.stop();
+ zkServer = null;
}
-
- JavaTestKit.shutdownActorSystem(actorSystem);
- actorSystem = null;
}
/**
- * Tests that the application master can be killed multiple times and
that the surviving
- * TaskManager successfully reconnects to the newly started JobManager.
- * @throws Exception
+ * Tests that Yarn will restart a killed {@link
YarnSessionClusterEntrypoint} which will then resume
+ * a persisted {@link JobGraph}.
*/
@Test
- public void testMultipleAMKill() throws Exception {
- assumeTrue("This test only works with the old actor based
code.", !isNewMode);
- final int numberKillingAttempts = numberApplicationAttempts - 1;
- String confDirPath =
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
- final Configuration configuration =
GlobalConfiguration.loadConfiguration();
- TestingYarnClusterDescriptor flinkYarnClient = new
TestingYarnClusterDescriptor(
- configuration,
- getYarnConfiguration(),
- confDirPath,
- getYarnClient(),
- true);
-
- Assert.assertNotNull("unable to get yarn client",
flinkYarnClient);
- flinkYarnClient.setLocalJarPath(new
Path(flinkUberjar.getAbsolutePath()));
-
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
- String fsStateHandlePath = temp.getRoot().getPath();
-
- // load the configuration
- File configDirectory = new File(confDirPath);
-
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
-
-
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
+
- zkServer.getConnectString() +
"@@yarn.application-attempts=" + numberApplicationAttempts +
- "@@" + CheckpointingOptions.STATE_BACKEND.key() +
"=FILESYSTEM" +
- "@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "="
+ fsStateHandlePath + "/checkpoints" +
- "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() +
"=" + fsStateHandlePath + "/recovery");
-
- ClusterClient<ApplicationId> yarnClusterClient = null;
-
- final FiniteDuration timeout = new FiniteDuration(2,
TimeUnit.MINUTES);
-
- HighAvailabilityServices highAvailabilityServices = null;
-
- final ClusterSpecification clusterSpecification = new
ClusterSpecification.ClusterSpecificationBuilder()
- .setMasterMemoryMB(768)
- .setTaskManagerMemoryMB(1024)
- .setNumberTaskManagers(1)
- .setSlotsPerTaskManager(1)
- .createClusterSpecification();
-
- try {
- yarnClusterClient =
flinkYarnClient.deploySessionCluster(clusterSpecification);
-
- highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
- yarnClusterClient.getFlinkConfiguration(),
- Executors.directExecutor(),
-
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
- final HighAvailabilityServices
finalHighAvailabilityServices = highAvailabilityServices;
-
- new JavaTestKit(actorSystem) {{
- for (int attempt = 0; attempt <
numberKillingAttempts; attempt++) {
- new Within(timeout) {
- @Override
- protected void run() {
- try {
- ActorGateway
gateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-
finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-
actorSystem,
-
timeout);
- ActorGateway
selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-
gateway.tell(new
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1),
selfGateway);
-
-
expectMsgEquals(Acknowledge.get());
-
-
gateway.tell(PoisonPill.getInstance());
- } catch (Exception e) {
- throw new
AssertionError("Could not complete test.", e);
- }
- }
- };
- }
-
- new Within(timeout) {
- @Override
- protected void run() {
- try {
- ActorGateway gateway =
LeaderRetrievalUtils.retrieveLeaderGateway(
-
finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- actorSystem,
- timeout);
- ActorGateway
selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
- gateway.tell(new
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1),
selfGateway);
-
-
expectMsgEquals(Acknowledge.get());
- } catch (Exception e) {
- throw new
AssertionError("Could not complete test.", e);
- }
- }
- };
-
- }};
- } finally {
- if (yarnClusterClient != null) {
- log.info("Shutting down the Flink Yarn
application.");
- yarnClusterClient.shutDownCluster();
- yarnClusterClient.shutdown();
- }
-
- if (highAvailabilityServices != null) {
-
highAvailabilityServices.closeAndCleanupAllData();
- }
+ public void testKillYarnSessionClusterEntrypoint() throws Exception {
+ assumeTrue(
+ "This test kills processes via the pkill command. Thus,
it only runs on Linux, Mac OS, Free BSD and Solaris.",
+ OperatingSystem.isLinux() || OperatingSystem.isMac() ||
OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
+
+ final YarnClusterDescriptor yarnClusterDescriptor =
setupYarnClusterDescriptor();
+
+ final RestClusterClient<ApplicationId> restClusterClient =
deploySessionCluster(yarnClusterDescriptor);
+
+ final JobGraph job = createJobGraph();
+
+ final JobID jobId = submitJob(restClusterClient, job);
+
+ final ApplicationId id = restClusterClient.getClusterId();
+
+ final long retryTimeout = 100L;
+ waitUntilJobIsRunning(restClusterClient, jobId, retryTimeout);
+
+
killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
+
+ final YarnClient yarnClient = getYarnClient();
+ Assert.assertNotNull(yarnClient);
+
+ while
(yarnClient.getApplicationReport(id).getCurrentApplicationAttemptId().getAttemptId()
< 2) {
+ Thread.sleep(retryTimeout);
+ }
+
+ waitUntilJobIsRunning(restClusterClient, jobId, retryTimeout);
+
+ yarnClient.killApplication(id);
+
+ while
(yarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() ==
0 &&
+
yarnClient.getApplications(EnumSet.of(YarnApplicationState.FINISHED)).size() ==
0) {
+ Thread.sleep(retryTimeout);
+ }
+ }
+
+ @Nonnull
+ private YarnClusterDescriptor setupYarnClusterDescriptor() {
+ final Configuration flinkConfiguration = new Configuration();
+
flinkConfiguration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10");
+ flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE,
"zookeeper");
+
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
storageDir);
+
flinkConfiguration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zkServer.getConnectString());
+
flinkConfiguration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT,
1000);
+
+ final int minMemory = 100;
+
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN,
minMemory);
+
+ return createYarnClusterDescriptor(flinkConfiguration);
+ }
+
+ private RestClusterClient<ApplicationId>
deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws
ClusterDeploymentException {
+ final int containerMemory = 256;
+ final ClusterClient<ApplicationId> yarnClusterClient =
yarnClusterDescriptor.deploySessionCluster(
+ new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(containerMemory)
+ .setTaskManagerMemoryMB(containerMemory)
+ .setSlotsPerTaskManager(1)
+ .createClusterSpecification());
+
+ assertThat(yarnClusterClient,
is(instanceOf(RestClusterClient.class)));
+ return (RestClusterClient<ApplicationId>) yarnClusterClient;
+ }
+
+ private JobID submitJob(RestClusterClient<ApplicationId>
restClusterClient, JobGraph job) throws InterruptedException,
java.util.concurrent.ExecutionException {
+ final CompletableFuture<JobSubmissionResult>
jobSubmissionResultCompletableFuture = restClusterClient.submitJob(job);
+
+ final JobSubmissionResult jobSubmissionResult =
jobSubmissionResultCompletableFuture.get();
+ return jobSubmissionResult.getJobID();
+ }
+
+ private void killApplicationMaster(final String processName) throws
IOException, InterruptedException {
+ final Process exec = Runtime.getRuntime().exec("pkill -f " +
processName);
Review comment:
The job could conditionally call `System.exit(...)` based on files in
temporary folders. That should be platform independent strategy.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services