dmvk opened a new pull request #16695:
URL: https://github.com/apache/flink/pull/16695


   ## What is the purpose of the change
   
   Fixing a flaky test. Test could get stuck due to hard to reproduce 
memory-visibility issue in TestingnJobManagerRunnerFactory and race condition 
between job recovery and dispatcher runner terminationn.
   
   Issue can be reproduced with following change.
   
   ```diff
   diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
   index 7b195f09294..47f9ac61734 100644
   --- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
   +++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
   @@ -30,6 +30,9 @@ import org.apache.flink.runtime.rpc.RpcService;
    
    import javax.annotation.Nonnull;
    
   +import java.util.Collections;
   +import java.util.HashSet;
   +import java.util.Set;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
   @@ -71,11 +74,22 @@ public class TestingJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
            return testingJobManagerRunner;
        }
    
   +    private final Set<String> threads = Collections.synchronizedSet(new 
HashSet<>());
   +
        @Nonnull
        private TestingJobManagerRunner createTestingJobManagerRunner(JobGraph 
jobGraph) {
            final boolean blockingTermination;
    
   -        if (numBlockingJobManagerRunners > 0) {
   +        final boolean memoryFlap = 
threads.add(Thread.currentThread().getName()) && threads.size() > 1;
   +        System.out.println(
   +                "FLAP: "
   +                        + memoryFlap
   +                        + ", THREAD: "
   +                        + Thread.currentThread()
   +                        + ", INSTANCE: "
   +                        + System.identityHashCode(this)
   +                        + threads);
   +        if (numBlockingJobManagerRunners > 0 || memoryFlap) {
                numBlockingJobManagerRunners--;
                blockingTermination = true;
            } else {
   ```
   
   ## Brief change log
   
   - TestingJobManagerRunnerFactory is now thread safe
   - We're waiting for job recovery before closing DispatcherRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to