Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5746#discussion_r177721959
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
---
@@ -19,45 +19,81 @@
package org.apache.flink.runtime.util;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
/**
* Testing fatal error handler which records the occurred exceptions
during the execution of the
* tests. Captured exceptions are thrown as a {@link TestingException}.
*/
public class TestingFatalErrorHandler implements FatalErrorHandler {
private static final Logger LOG =
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
- private final AtomicReference<Throwable> atomicThrowable;
+ private CompletableFuture<Throwable> errorFuture;
public TestingFatalErrorHandler() {
- atomicThrowable = new AtomicReference<>(null);
+ errorFuture = new CompletableFuture<>();
}
- public void rethrowError() throws TestingException {
- Throwable throwable = atomicThrowable.get();
+ public synchronized void rethrowError() throws TestingException {
+ final Throwable throwable = getException();
if (throwable != null) {
- throw new TestingException(throwable);
+ throw new TestingException(throwable);
+ }
+ }
+
+ public synchronized boolean hasExceptionOccurred() {
+ return errorFuture.isDone();
+ }
+
+ @Nullable
+ public synchronized Throwable getException() {
+ if (errorFuture.isDone()) {
+ Throwable throwable = null;
+
+ try {
+ throwable = errorFuture.get();
+ } catch (InterruptedException ie) {
+ Thread.interrupted();
--- End diff --
It does not make sense to call `Thread.interrupted()` here.
>Tests whether the current thread has been interrupted. The interrupted
status of the thread is cleared by this method.
However, the interrupted flag should be already cleared:
> By convention, any method that exits by throwing an InterruptedException
clears interrupt status when it does so.
https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html
---