This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ec4d155  [FLINK-17769] Wrong order of log events on a task failure
ec4d155 is described below

commit ec4d155101faa2c1979c4981457e63d657068169
Author: Yuan Mei <[email protected]>
AuthorDate: Wed Jun 17 13:04:56 2020 +0800

    [FLINK-17769] Wrong order of log events on a task failure
    
    When a task failure occurs, the error of disposing of an operator is logged
    before the real rootcasue is printed, which is confusing.
    
    This fix suppressed exception occurring in disposing of an operator and
    attached the exception together with the rootcause.
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 108 ++++++++++-----------
 .../streaming/runtime/tasks/StreamTaskTest.java    |  30 ++++--
 2 files changed, 76 insertions(+), 62 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a7d04d2..e65537f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -541,8 +541,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        try {
                                cleanUpInvoke();
                        }
+                       // TODO: investigate why Throwable instead of Exception 
is used here.
                        catch (Throwable cleanUpException) {
-                               throw (Exception) 
ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
+                               Throwable throwable = 
ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
+                               throw (throwable instanceof Exception ? 
(Exception) throwable : new Exception(throwable));
                        }
                        throw invokeException;
                }
@@ -593,8 +595,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                // make an attempt to dispose the operators such that failures 
in the dispose call
                // still let the computation fail
-               disposeAllOperators(false);
-               disposedOperators = true;
+               disposeAllOperators();
        }
 
        protected void cleanUpInvoke() throws Exception {
@@ -612,45 +613,28 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                Thread.interrupted();
 
                // stop all timers and threads
-               tryShutdownTimerService();
+               Exception suppressedException = 
runAndSuppressThrowable(this::tryShutdownTimerService, null);
 
                // stop all asynchronous checkpoint threads
-               try {
-                       cancelables.close();
-                       shutdownAsyncThreads();
-               } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Could not shut down async checkpoint 
threads", t);
-               }
+               suppressedException = 
runAndSuppressThrowable(cancelables::close, suppressedException);
+               suppressedException = 
runAndSuppressThrowable(this::shutdownAsyncThreads, suppressedException);
 
                // we must! perform this cleanup
-               try {
-                       cleanup();
-               } catch (Throwable t) {
-                       // catch and log the exception to not replace the 
original exception
-                       LOG.error("Error during cleanup of stream task", t);
-               }
+               suppressedException = runAndSuppressThrowable(this::cleanup, 
suppressedException);
 
                // if the operators were not disposed before, do a hard dispose
-               disposeAllOperators(true);
+               suppressedException = 
runAndSuppressThrowable(this::disposeAllOperators, suppressedException);
 
                // release the output resources. this method should never fail.
-               if (operatorChain != null) {
-                       // beware: without synchronization, 
#performCheckpoint() may run in
-                       //         parallel and this call is not thread-safe
-                       actionExecutor.run(() -> 
operatorChain.releaseOutputs());
-               } else {
-                       // failed to allocate operatorChain, clean up record 
writers
-                       recordWriter.close();
-               }
+               suppressedException = 
runAndSuppressThrowable(this::releaseOutputResources, suppressedException);
 
-               try {
-                       channelIOExecutor.shutdown();
-               } catch (Throwable t) {
-                       LOG.error("Error during shutdown the channel state 
unspill executor", t);
-               }
+               suppressedException = 
runAndSuppressThrowable(channelIOExecutor::shutdown, suppressedException);
 
-               mailboxProcessor.close();
+               suppressedException = 
runAndSuppressThrowable(mailboxProcessor::close, suppressedException);
+
+               if (suppressedException != null) {
+                       throw suppressedException;
+               }
        }
 
        @Override
@@ -687,27 +671,49 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
+       private void releaseOutputResources() throws Exception {
+               if (operatorChain != null) {
+                       // beware: without synchronization, 
#performCheckpoint() may run in
+                       //         parallel and this call is not thread-safe
+                       actionExecutor.run(() -> 
operatorChain.releaseOutputs());
+               } else {
+                       // failed to allocate operatorChain, clean up record 
writers
+                       recordWriter.close();
+               }
+       }
+
+       private Exception runAndSuppressThrowable(ThrowingRunnable<?> runnable, 
@Nullable Exception originalException) {
+               try {
+                       runnable.run();
+               } catch (Throwable t) {
+                       // TODO: investigate why Throwable instead of Exception 
is used here.
+                       Exception e = t instanceof Exception ? (Exception) t : 
new Exception(t);
+                       return ExceptionUtils.firstOrSuppressed(e, 
originalException);
+               }
+
+               return originalException;
+       }
+
        /**
         * Execute @link StreamOperator#dispose()} of each operator in the 
chain of this
         * {@link StreamTask}. Disposing happens from <b>tail to head</b> 
operator in the chain.
         */
-       private void disposeAllOperators(boolean logOnlyErrors) throws 
Exception {
+       private void disposeAllOperators() throws Exception {
                if (operatorChain != null && !disposedOperators) {
+                       Exception disposalException = null;
                        for (StreamOperatorWrapper<?, ?> operatorWrapper : 
operatorChain.getAllOperators(true)) {
                                StreamOperator<?> operator = 
operatorWrapper.getStreamOperator();
-                               if (!logOnlyErrors) {
+                               try {
                                        operator.dispose();
                                }
-                               else {
-                                       try {
-                                               operator.dispose();
-                                       }
-                                       catch (Exception e) {
-                                               LOG.error("Error during 
disposal of stream operator.", e);
-                                       }
+                               catch (Exception e) {
+                                       disposalException = 
ExceptionUtils.firstOrSuppressed(e, disposalException);
                                }
                        }
                        disposedOperators = true;
+                       if (disposalException != null) {
+                               throw disposalException;
+                       }
                }
        }
 
@@ -964,20 +970,14 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        }
 
        private void tryShutdownTimerService() {
-
                if (!timerService.isTerminated()) {
-
-                       try {
-                               final long timeoutMs = 
getEnvironment().getTaskManagerInfo().getConfiguration().
-                                       
getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
-
-                               if 
(!timerService.shutdownServiceUninterruptible(timeoutMs)) {
-                                       LOG.warn("Timer service shutdown 
exceeded time limit of {} ms while waiting for pending " +
-                                               "timers. Will continue with 
shutdown procedure.", timeoutMs);
-                               }
-                       } catch (Throwable t) {
-                               // catch and log the exception to not replace 
the original exception
-                               LOG.error("Could not shut down timer service", 
t);
+                       final long timeoutMs = getEnvironment()
+                               .getTaskManagerInfo()
+                               .getConfiguration()
+                               
.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
+                       if 
(!timerService.shutdownServiceUninterruptible(timeoutMs)) {
+                               LOG.warn("Timer service shutdown exceeded time 
limit of {} ms while waiting for pending " +
+                                       "timers. Will continue with shutdown 
procedure.", timeoutMs);
                        }
                }
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 833f07b..020aee4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -135,7 +135,6 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.StreamCorruptedException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -152,6 +151,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
+import static java.util.Arrays.asList;
 import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -209,7 +209,15 @@ public class StreamTaskTest extends TestLogger {
                        testHarness.waitForTaskCompletion();
                }
                catch (Exception ex) {
-                       if (!ExceptionUtils.findThrowable(ex, 
ExpectedTestException.class).isPresent()) {
+                       // make sure the original exception is the cause and 
not wrapped
+                       if (!(ex.getCause() instanceof ExpectedTestException)) {
+                               throw ex;
+                       }
+                       // make sure DisposeException is the only suppressed 
exception
+                       if (ex.getCause().getSuppressed().length != 1) {
+                               throw ex;
+                       }
+                       if (!(ex.getCause().getSuppressed()[0] instanceof 
FailingTwiceOperator.DisposeException)) {
                                throw ex;
                        }
                }
@@ -225,7 +233,13 @@ public class StreamTaskTest extends TestLogger {
 
                @Override
                public void dispose() throws Exception {
-                       fail("This exception should be suppressed");
+                       throw new DisposeException();
+               }
+
+               class DisposeException extends Exception {
+                       public DisposeException() {
+                               super("Dispose Exception. This exception should 
be suppressed");
+                       }
                }
        }
 
@@ -818,7 +832,7 @@ public class StreamTaskTest extends TestLogger {
                task.streamTask.cancel();
 
                final FutureUtils.ConjunctFuture<Void> discardFuture = 
FutureUtils.waitForAll(
-                       Arrays.asList(
+                       asList(
                                managedKeyedStateHandle.getDiscardFuture(),
                                rawKeyedStateHandle.getDiscardFuture(),
                                managedOperatorStateHandle.getDiscardFuture(),
@@ -1059,8 +1073,8 @@ public class StreamTaskTest extends TestLogger {
                }
 
                MockEnvironment mockEnvironment = new 
MockEnvironmentBuilder().build();
-               mockEnvironment.addOutputs(Arrays.asList(partitions));
-               mockEnvironment.addInputs(Arrays.asList(gates));
+               mockEnvironment.addOutputs(asList(partitions));
+               mockEnvironment.addInputs(asList(gates));
                StreamTask task = new 
MockStreamTaskBuilder(mockEnvironment).build();
                try {
                        verifyResults(gates, partitions, false, false);
@@ -1095,8 +1109,8 @@ public class StreamTaskTest extends TestLogger {
                        NoOpCheckpointResponder.INSTANCE,
                        reader);
                MockEnvironment mockEnvironment = new 
MockEnvironmentBuilder().setTaskStateManager(taskStateManager).build();
-               mockEnvironment.addOutputs(Arrays.asList(partitions));
-               mockEnvironment.addInputs(Arrays.asList(gates));
+               mockEnvironment.addOutputs(asList(partitions));
+               mockEnvironment.addInputs(asList(gates));
                StreamTask task = new 
MockStreamTaskBuilder(mockEnvironment).build();
                try {
                        verifyResults(gates, partitions, false, false);

Reply via email to