[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14698718#comment-14698718
 ] 

ASF GitHub Bot commented on FLINK-2462:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1017#discussion_r37146808
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
    @@ -27,114 +26,65 @@
     import org.apache.flink.streaming.api.graph.StreamEdge;
     import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
     import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
     
     public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, 
TwoInputStreamOperator<IN1, IN2, OUT>> {
     
    -   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
    -
        private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
    +   
    +   private volatile boolean running = true;
     
        @Override
    -   public void registerInputOutput() {
    -           try {
    -                   super.registerInputOutput();
    +   public void init() throws Exception {
    +           TypeSerializer<IN1> inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
    +           TypeSerializer<IN2> inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
        
    -                   TypeSerializer<IN1> inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
    -                   TypeSerializer<IN2> inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
    +           int numberOfInputs = configuration.getNumberOfInputs();
        
    -                   int numberOfInputs = configuration.getNumberOfInputs();
    +           ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
    +           ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
        
    -                   ArrayList<InputGate> inputList1 = new 
ArrayList<InputGate>();
    -                   ArrayList<InputGate> inputList2 = new 
ArrayList<InputGate>();
    +           List<StreamEdge> inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
        
    -                   List<StreamEdge> inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
    -   
    -                   for (int i = 0; i < numberOfInputs; i++) {
    -                           int inputType = inEdges.get(i).getTypeNumber();
    -                           InputGate reader = 
getEnvironment().getInputGate(i);
    -                           switch (inputType) {
    -                                   case 1:
    -                                           inputList1.add(reader);
    -                                           break;
    -                                   case 2:
    -                                           inputList2.add(reader);
    -                                           break;
    -                                   default:
    -                                           throw new 
RuntimeException("Invalid input type number: " + inputType);
    -                           }
    +           for (int i = 0; i < numberOfInputs; i++) {
    +                   int inputType = inEdges.get(i).getTypeNumber();
    +                   InputGate reader = getEnvironment().getInputGate(i);
    +                   switch (inputType) {
    +                           case 1:
    +                                   inputList1.add(reader);
    +                                   break;
    +                           case 2:
    --- End diff --
    
    This was actually part of the original code - I did not modify it as part 
of this pull request.
    As far as I see it, the `StreamEdge` code is part of the API, not the 
runtime. It may be adjusted as part of  #988 


> Wrong exception reporting in streaming jobs
> -------------------------------------------
>
>                 Key: FLINK-2462
>                 URL: https://issues.apache.org/jira/browse/FLINK-2462
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 0.10
>
>
> When streaming tasks are fail and are canceled, they report a plethora of 
> followup exceptions.
> The batch operators have a clear model that makes sure that root causes are 
> reported, and followup exceptions are not reported. That makes debugging much 
> easier.
> A big part of that is to have a single consistent place that logs exceptions, 
> and that has a view of whether the operation is still running, or whether it 
> has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to