[ 
https://issues.apache.org/jira/browse/FLINK-20126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235481#comment-17235481
 ] 

Matthias commented on FLINK-20126:
----------------------------------

Used test code was copied from 
[Github|https://github.com/rmetzger/scratch/blob/flink-1.12-test/src/main/java/de/robertmetzger/HangingOnSubmission.java]:
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.mapohl;

import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

public class HangingJobSubmission {

   private static final Logger LOG = 
LoggerFactory.getLogger(HangingJobSubmission.class);

   public static void main(String[] args) throws Exception {
      ParameterTool parameterTool = ParameterTool.fromArgs(args);
      LOG.info("Welcome");
      // set up the streaming execution environment
      ExecutionEnvironment batch = 
ExecutionEnvironment.getExecutionEnvironment();

      DataSource<String> input = batch.createInput(new 
HangingInputFormat(parameterTool));
      input.printOnTaskManager("yolo");

      batch.execute("this is my test");
   }

   private static class HangingInputFormat implements InputFormat<String, 
MyInputSplits> {
      private final ParameterTool params;
      private boolean running = true;

      public HangingInputFormat(ParameterTool parameterTool) {
         this.params = parameterTool;
      }

      @Override
      public void configure(Configuration parameters) {

      }

      @Override
      public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
         return null;
      }

      @Override
      public MyInputSplits[] createInputSplits(int minNumSplits) throws 
IOException {
         if (params.has("failAfterSecs")) {
            int failSecs = params.getInt("failAfterSecs", 30);
            LOG.info("Waiting for {} seconds, then failing", failSecs);
            try {
               Thread.sleep(1000 * failSecs);
            } catch (InterruptedException e) {
               Thread.interrupted();
            }
            throw new RuntimeException("Oh my god, I failed");
         }
         if(params.has("hangMins")) {
            int mins = params.getInt("hangMins", 1);
            LOG.info("where am I, Hanging for " + mins, new 
RuntimeException("Here"));
            try {
               Thread.sleep(1000 * 60 * mins);
            } catch (InterruptedException e) {
               Thread.interrupted();
            }
         }
         return new MyInputSplits[]{new MyInputSplits()};
      }

      @Override
      public InputSplitAssigner getInputSplitAssigner(MyInputSplits[] 
inputSplits) {
         return new InputSplitAssigner() {
            @Override
            public InputSplit getNextInputSplit(String s, int i) {
               return new MyInputSplits();
            }

            @Override
            public void returnInputSplit(List<InputSplit> list, int i) {
               //
            }
         };
      }

      @Override
      public void open(MyInputSplits split) throws IOException {

      }

      @Override
      public boolean reachedEnd() throws IOException {
         return !running;
      }

      @Override
      public String nextRecord(String reuse) throws IOException {
         if (params.has("failAtRuntime")) {
            throw new RuntimeException("Now I failed at runtime");
         }
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) {
            running = false;
         }
         return "this is all I got";
      }

      @Override
      public void close() throws IOException {

      }
   }

   private static class MyInputSplits implements InputSplit {

      @Override
      public int getSplitNumber() {
         return 0;
      }
   }
} {code}

> Test non-blocking job submission
> --------------------------------
>
>                 Key: FLINK-20126
>                 URL: https://issues.apache.org/jira/browse/FLINK-20126
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.12.0
>            Reporter: Robert Metzger
>            Assignee: Matthias
>            Priority: Critical
>             Fix For: 1.12.0
>
>         Attachments: HangingJobSubmission.jar
>
>
> Introduced in https://issues.apache.org/jira/browse/FLINK-16866
> Test:
> - job submission lasting longer than the web submit timeout (pre 1.12.0 would 
> have failed with a timeout there)
> - proper error reporting in case of submission errors (exceptions during 
> submission, not operations)
> - proper behavior of the web ui during hanging, erroneous and successful 
> submission.
> ----
> [General Information about the Flink 1.12 release 
> testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing]
> When testing a feature, consider the following aspects:
> - Is the documentation easy to understand
> - Are the error messages, log messages, APIs etc. easy to understand
> - Is the feature working as expected under normal conditions
> - Is the feature working / failing as expected with invalid input, induced 
> errors etc.
> If you find a problem during testing, please file a ticket 
> (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket.
> During the testing keep us updated on tests conducted, or please write a 
> short summary of all things you have tested in the end.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to