wangyang0918 commented on a change in pull request #16395:
URL: https://github.com/apache/flink/pull/16395#discussion_r665264491



##########
File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
##########
@@ -74,22 +75,27 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;

Review comment:
       Out the scope of this PR, but maybe we could replace the deprecated 
`org.junit.Assert.assertThat` with `org.hamcrest.MatcherAssert.assertThat`.

##########
File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
##########
@@ -372,10 +378,28 @@ private JobID submitJob(RestClusterClient<ApplicationId> 
restClusterClient)
         return restClusterClient.submitJob(job).get();
     }
 
-    private void killApplicationMaster(final String processName)
-            throws IOException, InterruptedException {
-        final Process exec = Runtime.getRuntime().exec("pkill -f " + 
processName);
-        assertThat(exec.waitFor(), is(0));
+    private void killApplicationMaster(final String processName) throws 
Exception {
+        final Set<Integer> origPids = getApplicationMasterPids(processName);
+        assertThat(origPids, not(empty()));
+
+        final Process pkillExec = Runtime.getRuntime().exec("pkill -f " + 
processName);
+        assertThat(pkillExec.waitFor(), is(0));
+
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    final Set<Integer> curPids = 
getApplicationMasterPids(processName);
+                    return origPids.stream().noneMatch(curPids::contains);
+                },
+                Deadline.fromNow(TIMEOUT));
+    }
+
+    private Set<Integer> getApplicationMasterPids(final String processName) 
throws IOException {
+        final Process exec = Runtime.getRuntime().exec("pgrep -f " + 
processName);
+        return Arrays.stream(

Review comment:
       We will get the following exception when the result of `pgrep -f` is 
empty. Right?
   
   ```
   java.lang.NumberFormatException: For input string: ""
   
        at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Integer.parseInt(Integer.java:592)
        at java.lang.Integer.valueOf(Integer.java:766)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
        at 
org.apache.flink.yarn.YARNHighAvailabilityITCase.getApplicationMasterPids(YARNHighAvailabilityITCase.java:402)
        at 
org.apache.flink.yarn.YARNHighAvailabilityITCase.lambda$killApplicationMaster$5(YARNHighAvailabilityITCase.java:390)
        at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:148)
        at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:132)
        at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:124)
        at 
org.apache.flink.yarn.YARNHighAvailabilityITCase.killApplicationMaster(YARNHighAvailabilityITCase.java:388)
        at 
org.apache.flink.yarn.YARNHighAvailabilityITCase.lambda$testKillYarnSessionClusterEntrypoint$0(YARNHighAvailabilityITCase.java:182)
        at org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:291)
        at 
org.apache.flink.yarn.YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint(YARNHighAvailabilityITCase.java:162)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
        at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to