[
https://issues.apache.org/jira/browse/FLINK-20126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235481#comment-17235481
]
Matthias commented on FLINK-20126:
----------------------------------
Used test code was copied from
[Github|https://github.com/rmetzger/scratch/blob/flink-1.12-test/src/main/java/de/robertmetzger/HangingOnSubmission.java]:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mapohl;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
public class HangingJobSubmission {
private static final Logger LOG =
LoggerFactory.getLogger(HangingJobSubmission.class);
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
LOG.info("Welcome");
// set up the streaming execution environment
ExecutionEnvironment batch =
ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> input = batch.createInput(new
HangingInputFormat(parameterTool));
input.printOnTaskManager("yolo");
batch.execute("this is my test");
}
private static class HangingInputFormat implements InputFormat<String,
MyInputSplits> {
private final ParameterTool params;
private boolean running = true;
public HangingInputFormat(ParameterTool parameterTool) {
this.params = parameterTool;
}
@Override
public void configure(Configuration parameters) {
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics)
throws IOException {
return null;
}
@Override
public MyInputSplits[] createInputSplits(int minNumSplits) throws
IOException {
if (params.has("failAfterSecs")) {
int failSecs = params.getInt("failAfterSecs", 30);
LOG.info("Waiting for {} seconds, then failing", failSecs);
try {
Thread.sleep(1000 * failSecs);
} catch (InterruptedException e) {
Thread.interrupted();
}
throw new RuntimeException("Oh my god, I failed");
}
if(params.has("hangMins")) {
int mins = params.getInt("hangMins", 1);
LOG.info("where am I, Hanging for " + mins, new
RuntimeException("Here"));
try {
Thread.sleep(1000 * 60 * mins);
} catch (InterruptedException e) {
Thread.interrupted();
}
}
return new MyInputSplits[]{new MyInputSplits()};
}
@Override
public InputSplitAssigner getInputSplitAssigner(MyInputSplits[]
inputSplits) {
return new InputSplitAssigner() {
@Override
public InputSplit getNextInputSplit(String s, int i) {
return new MyInputSplits();
}
@Override
public void returnInputSplit(List<InputSplit> list, int i) {
//
}
};
}
@Override
public void open(MyInputSplits split) throws IOException {
}
@Override
public boolean reachedEnd() throws IOException {
return !running;
}
@Override
public String nextRecord(String reuse) throws IOException {
if (params.has("failAtRuntime")) {
throw new RuntimeException("Now I failed at runtime");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
running = false;
}
return "this is all I got";
}
@Override
public void close() throws IOException {
}
}
private static class MyInputSplits implements InputSplit {
@Override
public int getSplitNumber() {
return 0;
}
}
} {code}
> Test non-blocking job submission
> --------------------------------
>
> Key: FLINK-20126
> URL: https://issues.apache.org/jira/browse/FLINK-20126
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Affects Versions: 1.12.0
> Reporter: Robert Metzger
> Assignee: Matthias
> Priority: Critical
> Fix For: 1.12.0
>
> Attachments: HangingJobSubmission.jar
>
>
> Introduced in https://issues.apache.org/jira/browse/FLINK-16866
> Test:
> - job submission lasting longer than the web submit timeout (pre 1.12.0 would
> have failed with a timeout there)
> - proper error reporting in case of submission errors (exceptions during
> submission, not operations)
> - proper behavior of the web ui during hanging, erroneous and successful
> submission.
> ----
> [General Information about the Flink 1.12 release
> testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing]
> When testing a feature, consider the following aspects:
> - Is the documentation easy to understand
> - Are the error messages, log messages, APIs etc. easy to understand
> - Is the feature working as expected under normal conditions
> - Is the feature working / failing as expected with invalid input, induced
> errors etc.
> If you find a problem during testing, please file a ticket
> (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket.
> During the testing keep us updated on tests conducted, or please write a
> short summary of all things you have tested in the end.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)