This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 928b6897b7a0c609ab27f679b31852f18fa40684
Author: Xintong Song <[email protected]>
AuthorDate: Tue Jul 6 18:19:14 2021 +0800

    [FLINK-22662][yarn][test] Stabilize YARNHighAvailabilityITCase
    
    This closes #16395
---
 .../flink/yarn/YARNHighAvailabilityITCase.java     | 34 ++++++++++++++++++++--
 1 file changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index d6c34af..5e3725f 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -48,6 +48,7 @@ import org.apache.flink.yarn.util.TestUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -74,23 +75,28 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assume.assumeTrue;
@@ -372,10 +378,34 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
         return restClusterClient.submitJob(job).get();
     }
 
-    private void killApplicationMaster(final String processName)
-            throws IOException, InterruptedException {
+    private void killApplicationMaster(final String processName) throws 
Exception {
+        final Set<Integer> origPids = getApplicationMasterPids(processName);
+        assertThat(origPids, not(empty()));
+
         final Process exec = Runtime.getRuntime().exec("pkill -f " + 
processName);
         assertThat(exec.waitFor(), is(0));
+
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    final Set<Integer> curPids = 
getApplicationMasterPids(processName);
+                    return origPids.stream().noneMatch(curPids::contains);
+                },
+                Deadline.fromNow(TIMEOUT));
+    }
+
+    private Set<Integer> getApplicationMasterPids(final String processName)
+            throws IOException, InterruptedException {
+        final Process exec = Runtime.getRuntime().exec("pgrep -f " + 
processName);
+
+        if (exec.waitFor() != 0) {
+            return Collections.emptySet();
+        }
+
+        return Arrays.stream(
+                        IOUtils.toString(exec.getInputStream(), 
StandardCharsets.UTF_8)
+                                .split("\\s+"))
+                .map(Integer::valueOf)
+                .collect(Collectors.toSet());
     }
 
     private static void waitUntilJobIsRunning(

Reply via email to