[ https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14698718#comment-14698718 ]
ASF GitHub Bot commented on FLINK-2462: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1017#discussion_r37146808 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java --- @@ -27,114 +26,65 @@ import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> { - private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class); - private StreamTwoInputProcessor<IN1, IN2> inputProcessor; + + private volatile boolean running = true; @Override - public void registerInputOutput() { - try { - super.registerInputOutput(); + public void init() throws Exception { + TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader); + TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader); - TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader); - TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader); + int numberOfInputs = configuration.getNumberOfInputs(); - int numberOfInputs = configuration.getNumberOfInputs(); + ArrayList<InputGate> inputList1 = new ArrayList<InputGate>(); + ArrayList<InputGate> inputList2 = new ArrayList<InputGate>(); - ArrayList<InputGate> inputList1 = new ArrayList<InputGate>(); - ArrayList<InputGate> inputList2 = new ArrayList<InputGate>(); + List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader); - List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader); - - for (int i = 0; i < numberOfInputs; i++) { - int inputType = inEdges.get(i).getTypeNumber(); - InputGate reader = getEnvironment().getInputGate(i); - switch (inputType) { - case 1: - inputList1.add(reader); - break; - case 2: - inputList2.add(reader); - break; - default: - throw new RuntimeException("Invalid input type number: " + inputType); - } + for (int i = 0; i < numberOfInputs; i++) { + int inputType = inEdges.get(i).getTypeNumber(); + InputGate reader = getEnvironment().getInputGate(i); + switch (inputType) { + case 1: + inputList1.add(reader); + break; + case 2: --- End diff -- This was actually part of the original code - I did not modify it as part of this pull request. As far as I see it, the `StreamEdge` code is part of the API, not the runtime. It may be adjusted as part of #988 > Wrong exception reporting in streaming jobs > ------------------------------------------- > > Key: FLINK-2462 > URL: https://issues.apache.org/jira/browse/FLINK-2462 > Project: Flink > Issue Type: Bug > Components: Streaming > Affects Versions: 0.10 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Priority: Blocker > Fix For: 0.10 > > > When streaming tasks are fail and are canceled, they report a plethora of > followup exceptions. > The batch operators have a clear model that makes sure that root causes are > reported, and followup exceptions are not reported. That makes debugging much > easier. > A big part of that is to have a single consistent place that logs exceptions, > and that has a view of whether the operation is still running, or whether it > has been canceled. -- This message was sent by Atlassian JIRA (v6.3.4#6332)