GJL commented on a change in pull request #7509: [FLINK-10558][Yarn tests] Port 
YARNHighAvailabilityITCase to new codebase
URL: https://github.com/apache/flink/pull/7509#discussion_r248476938
 
 

 ##########
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 ##########
 @@ -18,198 +18,205 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
+import org.apache.flink.yarn.testjob.YarnTestJob;
+import org.apache.flink.yarn.util.YarnTestUtils;
+
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
+import javax.annotation.Nonnull;
 
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests that verify correct HA behavior.
  */
 public class YARNHighAvailabilityITCase extends YarnTestBase {
 
-       private static TestingServer zkServer;
-
-       private static ActorSystem actorSystem;
+       @ClassRule
+       public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
-       private static final int numberApplicationAttempts = 3;
+       private static final String LOG_DIR = "flink-yarn-tests-ha";
+       private static final Duration TIMEOUT = Duration.ofSeconds(200L);
 
-       @Rule
-       public TemporaryFolder temp = new TemporaryFolder();
+       private static TestingServer zkServer;
+       private static String storageDir;
 
        @BeforeClass
-       public static void setup() {
-               actorSystem = AkkaUtils.createDefaultActorSystem();
-
-               try {
-                       zkServer = new TestingServer();
-                       zkServer.start();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Could not start ZooKeeper testing 
cluster.");
-               }
+       public static void setup() throws Exception {
+               zkServer = new TestingServer();
 
-               YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-ha");
-               YARN_CONFIGURATION.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" 
+ numberApplicationAttempts);
+               storageDir = FOLDER.newFolder().getAbsolutePath();
 
+               // startYARNWithConfig should be implemented by subclass
+               YARN_CONFIGURATION.setClass(YarnConfiguration.RM_SCHEDULER, 
CapacityScheduler.class, ResourceScheduler.class);
+               YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
LOG_DIR);
+               YARN_CONFIGURATION.setInt(YarnConfiguration.NM_PMEM_MB, 4096);
                startYARNWithConfig(YARN_CONFIGURATION);
        }
 
        @AfterClass
        public static void teardown() throws Exception {
                if (zkServer != null) {
                        zkServer.stop();
+                       zkServer = null;
                }
-
-               JavaTestKit.shutdownActorSystem(actorSystem);
-               actorSystem = null;
        }
 
        /**
-        * Tests that the application master can be killed multiple times and 
that the surviving
-        * TaskManager successfully reconnects to the newly started JobManager.
-        * @throws Exception
+        * Tests that Yarn will restart a killed {@link 
YarnSessionClusterEntrypoint} which will then resume
+        * a persisted {@link JobGraph}.
         */
        @Test
-       public void testMultipleAMKill() throws Exception {
-               assumeTrue("This test only works with the old actor based 
code.", !isNewMode);
-               final int numberKillingAttempts = numberApplicationAttempts - 1;
-               String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
-               final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
-               TestingYarnClusterDescriptor flinkYarnClient = new 
TestingYarnClusterDescriptor(
-                       configuration,
-                       getYarnConfiguration(),
-                       confDirPath,
-                       getYarnClient(),
-                       true);
-
-               Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
-               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-               
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
-               String fsStateHandlePath = temp.getRoot().getPath();
-
-               // load the configuration
-               File configDirectory = new File(confDirPath);
-               
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
-
-               
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
 +
-                       zkServer.getConnectString() + 
"@@yarn.application-attempts=" + numberApplicationAttempts +
-                       "@@" + CheckpointingOptions.STATE_BACKEND.key() + 
"=FILESYSTEM" +
-                       "@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" 
+ fsStateHandlePath + "/checkpoints" +
-                       "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + 
"=" + fsStateHandlePath + "/recovery");
-
-               ClusterClient<ApplicationId> yarnClusterClient = null;
-
-               final FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
-
-               HighAvailabilityServices highAvailabilityServices = null;
-
-               final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
-                       .setMasterMemoryMB(768)
-                       .setTaskManagerMemoryMB(1024)
-                       .setNumberTaskManagers(1)
-                       .setSlotsPerTaskManager(1)
-                       .createClusterSpecification();
-
-               try {
-                       yarnClusterClient = 
flinkYarnClient.deploySessionCluster(clusterSpecification);
-
-                       highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                               yarnClusterClient.getFlinkConfiguration(),
-                               Executors.directExecutor(),
-                               
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-                       final HighAvailabilityServices 
finalHighAvailabilityServices = highAvailabilityServices;
-
-                       new JavaTestKit(actorSystem) {{
-                               for (int attempt = 0; attempt < 
numberKillingAttempts; attempt++) {
-                                       new Within(timeout) {
-                                               @Override
-                                               protected void run() {
-                                                       try {
-                                                               ActorGateway 
gateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-                                                                       
finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                                                                       
actorSystem,
-                                                                       
timeout);
-                                                               ActorGateway 
selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-                                                               
gateway.tell(new 
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), 
selfGateway);
-
-                                                               
expectMsgEquals(Acknowledge.get());
-
-                                                               
gateway.tell(PoisonPill.getInstance());
-                                                       } catch (Exception e) {
-                                                               throw new 
AssertionError("Could not complete test.", e);
-                                                       }
-                                               }
-                                       };
-                               }
-
-                               new Within(timeout) {
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       ActorGateway gateway = 
LeaderRetrievalUtils.retrieveLeaderGateway(
-                                                               
finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                                                               actorSystem,
-                                                               timeout);
-                                                       ActorGateway 
selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-                                                       gateway.tell(new 
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), 
selfGateway);
-
-                                                       
expectMsgEquals(Acknowledge.get());
-                                               } catch (Exception e) {
-                                                       throw new 
AssertionError("Could not complete test.", e);
-                                               }
-                                       }
-                               };
-
-                       }};
-               } finally {
-                       if (yarnClusterClient != null) {
-                               log.info("Shutting down the Flink Yarn 
application.");
-                               yarnClusterClient.shutDownCluster();
-                               yarnClusterClient.shutdown();
-                       }
-
-                       if (highAvailabilityServices != null) {
-                               
highAvailabilityServices.closeAndCleanupAllData();
-                       }
+       public void testKillYarnSessionClusterEntrypoint() throws Exception {
+               assumeTrue(
+                       "This test kills processes via the pkill command. Thus, 
it only runs on Linux, Mac OS, Free BSD and Solaris.",
+                       OperatingSystem.isLinux() || OperatingSystem.isMac() || 
OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
+
+               final YarnClusterDescriptor yarnClusterDescriptor = 
setupYarnClusterDescriptor();
+
+               final RestClusterClient<ApplicationId> restClusterClient = 
deploySessionCluster(yarnClusterDescriptor);
+
+               final JobGraph job = createJobGraph();
+
+               final JobID jobId = submitJob(restClusterClient, job);
+
+               final ApplicationId id = restClusterClient.getClusterId();
+
+               final long retryTimeout = 100L;
+               waitUntilJobIsRunning(restClusterClient, jobId, retryTimeout);
+
+               
killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
+
+               final YarnClient yarnClient = getYarnClient();
+               Assert.assertNotNull(yarnClient);
+
+               while 
(yarnClient.getApplicationReport(id).getCurrentApplicationAttemptId().getAttemptId()
 < 2) {
+                       Thread.sleep(retryTimeout);
+               }
+
+               waitUntilJobIsRunning(restClusterClient, jobId, retryTimeout);
+
+               yarnClient.killApplication(id);
+
+               while 
(yarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 
0 &&
 
 Review comment:
   Wouldn't this be semantically equivalent?
   ```suggestion
                while 
(yarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED, 
YarnApplicationState.FINISHED)).isEmpty()) {
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to