[ https://issues.apache.org/jira/browse/FLINK-7430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154991#comment-16154991 ]
Aljoscha Krettek commented on FLINK-7430: ----------------------------------------- I think the issue lies here: https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L280. Both {{ContinuousFileReaderOperator}} and {{AsyncWaitOperator}} have asynchronous computation going on and they wait in {{close()}} for those to finish. However, {{StreamTask}} will set {{isRunning}} to {{false}} to early and therefore swallow exceptions that occur in the asynchronous parts while waiting for {{close()}} to finish. I think the solution is setting {{isRunning = true}} after closing all operators. > ContinuousFileReaderOperator swallows exceptions > ------------------------------------------------ > > Key: FLINK-7430 > URL: https://issues.apache.org/jira/browse/FLINK-7430 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector > Affects Versions: 1.4.0, 1.3.2 > Environment: - macOS 10.12.6 > - Oracle JDK 1.8.0_144 > - Flink 1.3.2 > Reporter: Peter Ertl > Priority: Critical > > The class *ContinuousFileReaderOperator* is swallowing exceptions as the > following example demonstrates: > {code:java} > package org.apache.flink.streaming.examples; > import java.io.File; > import java.io.IOException; > import org.apache.flink.api.common.io.OutputFormat; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > public class FormatExceptionSwallowed { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > File bla = File.createTempFile("foo", "baz"); > try(PrintWriter w = new PrintWriter(bla)) { > w.println("one"); > w.println("two"); > w.println("three"); > } > env.readTextFile(bla.getCanonicalPath()) > .writeUsingOutputFormat(new OutputFormat<String>() { > @Override > public void configure(final Configuration > parameters) { > } > @Override > public void open(final int taskNumber, final > int numTasks) throws IOException { > } > @Override > public void writeRecord(final String record) > throws IOException { > throw new > IllegalArgumentException("bla"); > } > @Override > public void close() throws IOException { > } > }); > env.execute("go"); > > // JOB TERMINATES WITH NO EXCEPTION / ERROR whatsoever ... > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)