[ 
https://issues.apache.org/jira/browse/FLINK-12302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840352#comment-16840352
 ] 

lamber-ken commented on FLINK-12302:
------------------------------------

thanks for your comment [~gjy], follow below steps, you'll get it.

*1, download flink-1.8*
{code:java}
wget 
http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.11.tgz
wget 
http://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop2-uber/2.4.1-1.8.0/flink-shaded-hadoop2-uber-2.4.1-1.8.0.jar
{code}

*2, config flink*
{code:java}
jobmanager.archive.fs.dir: hdfs:///flink/dev/flink-completed-jobs

yarn.application-attempts: 10

high-availability: zookeeper
high-availability.zookeeper.quorum: spslave.bigdata.ly:2205
high-availability.zookeeper.path.root: /flink-dev
high-availability.zookeeper.storageDir: hdfs:///flink/dev/recovery
{code}

*3, submit flink job*
{code:java}
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class TestDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 
Time.seconds(1)));

        DataStream<String> text = env.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (true) {
                    ctx.collect("aaaa");
                    Thread.sleep(100);
                }
            }
            @Override
            public void cancel() {

            }
        });

        text.addSink(new SinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
               int a = 1 / 0;
            }
        });

        env.execute();
    }


}
{code}

*4, after 40s, you'll see*
 !screenshot-1.png! 


*5, now kill the am by hand*
 !screenshot-2.png! 











> Fixed the wrong finalStatus of yarn application when application finished
> -------------------------------------------------------------------------
>
>                 Key: FLINK-12302
>                 URL: https://issues.apache.org/jira/browse/FLINK-12302
>             Project: Flink
>          Issue Type: Improvement
>          Components: Deployment / YARN
>    Affects Versions: 1.8.0
>            Reporter: lamber-ken
>            Assignee: lamber-ken
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.9.0
>
>         Attachments: fix-bad-finalStatus.patch, 
> image-2019-04-23-19-56-49-933.png, screenshot-1.png, screenshot-2.png, 
> spslave4.bigdata.ly_23951, spslave5.bigdata.ly_20271
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> flink job(flink-1.6.3) failed in per-job yarn cluste mode, the 
> resourcemanager of yarn rerun the job.
> when the job failed again, the application while finish, but the finalStatus 
> is +UNDEFINED,+  It's better to show state +FAILED+
> !image-2019-04-23-19-56-49-933.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to