This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 928b6897b7a0c609ab27f679b31852f18fa40684 Author: Xintong Song <[email protected]> AuthorDate: Tue Jul 6 18:19:14 2021 +0800 [FLINK-22662][yarn][test] Stabilize YARNHighAvailabilityITCase This closes #16395 --- .../flink/yarn/YARNHighAvailabilityITCase.java | 34 ++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index d6c34af..5e3725f 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -48,6 +48,7 @@ import org.apache.flink.yarn.util.TestUtils; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.curator.test.TestingServer; import org.apache.hadoop.security.UserGroupInformation; @@ -74,23 +75,28 @@ import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkState; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertNotNull; import static org.junit.Assume.assumeTrue; @@ -372,10 +378,34 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { return restClusterClient.submitJob(job).get(); } - private void killApplicationMaster(final String processName) - throws IOException, InterruptedException { + private void killApplicationMaster(final String processName) throws Exception { + final Set<Integer> origPids = getApplicationMasterPids(processName); + assertThat(origPids, not(empty())); + final Process exec = Runtime.getRuntime().exec("pkill -f " + processName); assertThat(exec.waitFor(), is(0)); + + CommonTestUtils.waitUntilCondition( + () -> { + final Set<Integer> curPids = getApplicationMasterPids(processName); + return origPids.stream().noneMatch(curPids::contains); + }, + Deadline.fromNow(TIMEOUT)); + } + + private Set<Integer> getApplicationMasterPids(final String processName) + throws IOException, InterruptedException { + final Process exec = Runtime.getRuntime().exec("pgrep -f " + processName); + + if (exec.waitFor() != 0) { + return Collections.emptySet(); + } + + return Arrays.stream( + IOUtils.toString(exec.getInputStream(), StandardCharsets.UTF_8) + .split("\\s+")) + .map(Integer::valueOf) + .collect(Collectors.toSet()); } private static void waitUntilJobIsRunning(
