[ 
https://issues.apache.org/jira/browse/FLINK-34384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-34384:
----------------------------
    Description: 
Test suggestion:
 # Prepare a datastream job that all tasks throw exception directly.
 ## Set the parallelism to 5 or above
 # Prepare some configuration options:
 ** restart-strategy.type : exponential-delay
 ** restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
 # Start the cluster: ./bin/start-cluster.sh
 # Run the job: ./bin/flink run -c className jarName
 # Check the result
 ** Check whether job will be retried 7 times
 ** Check the exception history, the list has 2 exceptions
 ** Each retries except the last one can see the 5 subtasks(They are concurrent 
exceptions).

!image-2024-02-06-14-57-51-386.png!

 

Note: Set these options mentioned at step2 at 2 level separately
 * Cluster level: set them in the config.yaml
 * Job level: Set them in the code

 

Job level demo:
{code:java}
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    conf.setString("restart-strategy", "exponential-delay");
    
conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff",
 "6");
    StreamExecutionEnvironment env =  
StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(5);

    DataGeneratorSource<Long> generatorSource =
            new DataGeneratorSource<>(
                    value -> value,
                    300,
                    RateLimiterStrategy.perSecond(10),
                    Types.LONG);

    env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data 
Generator")
            .map(new RichMapFunction<Long, Long>() {
                @Override
                public Long map(Long value) {
                    throw new RuntimeException(
                            "Excepted testing exception, subtaskIndex: " + 
getRuntimeContext().getIndexOfThisSubtask());
                }
            })
            .print();

    env.execute();
} {code}

  was:
Test suggestion:
 # Prepare a datastream job that all tasks throw exception directly.
 ## Set the parallelism to 5 or above
 # Prepare some configuration options:
 ** restart-strategy.type : exponential-delay
 ** restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
 # Start the cluster: ./bin/start-cluster.sh
 # Run the job: ./bin/flink run -c className jarName
 # Check the result
 ** Check whether job will be retried 7 times
 ** Check the exception history, the list has 2 exceptions
 ** Each retries except the last one can see the 5 subtasks(They are concurrent 
exceptions).

!image-2024-02-06-14-51-11-256.png!

 

Note: Set these options mentioned at step2 at 2 level separately
 * Cluster level: set them in the config.yaml
 * Job level: Set them in the code

 

Job level demo:
{code:java}
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    conf.setString("restart-strategy", "exponential-delay");
    
conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff",
 "6");
    StreamExecutionEnvironment env =  
StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(5);

    DataGeneratorSource<Long> generatorSource =
            new DataGeneratorSource<>(
                    value -> value,
                    300,
                    RateLimiterStrategy.perSecond(10),
                    Types.LONG);

    env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data 
Generator")
            .map(new RichMapFunction<Long, Long>() {
                @Override
                public Long map(Long value) {
                    throw new RuntimeException(
                            "Excepted testing exception, subtaskIndex: " + 
getRuntimeContext().getIndexOfThisSubtask());
                }
            })
            .print();

    env.execute();
} {code}


> Release Testing: Verify FLINK-33735 Improve the exponential-delay 
> restart-strategy 
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-34384
>                 URL: https://issues.apache.org/jira/browse/FLINK-34384
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.19.0
>            Reporter: lincoln lee
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.19.0
>
>         Attachments: image-2024-02-06-14-57-51-386.png
>
>
> Test suggestion:
>  # Prepare a datastream job that all tasks throw exception directly.
>  ## Set the parallelism to 5 or above
>  # Prepare some configuration options:
>  ** restart-strategy.type : exponential-delay
>  ** restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
>  # Start the cluster: ./bin/start-cluster.sh
>  # Run the job: ./bin/flink run -c className jarName
>  # Check the result
>  ** Check whether job will be retried 7 times
>  ** Check the exception history, the list has 2 exceptions
>  ** Each retries except the last one can see the 5 subtasks(They are 
> concurrent exceptions).
> !image-2024-02-06-14-57-51-386.png!
>  
> Note: Set these options mentioned at step2 at 2 level separately
>  * Cluster level: set them in the config.yaml
>  * Job level: Set them in the code
>  
> Job level demo:
> {code:java}
> public static void main(String[] args) throws Exception {
>     Configuration conf = new Configuration();
>     conf.setString("restart-strategy", "exponential-delay");
>     
> conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff",
>  "6");
>     StreamExecutionEnvironment env =  
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>     env.setParallelism(5);
>     DataGeneratorSource<Long> generatorSource =
>             new DataGeneratorSource<>(
>                     value -> value,
>                     300,
>                     RateLimiterStrategy.perSecond(10),
>                     Types.LONG);
>     env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data 
> Generator")
>             .map(new RichMapFunction<Long, Long>() {
>                 @Override
>                 public Long map(Long value) {
>                     throw new RuntimeException(
>                             "Excepted testing exception, subtaskIndex: " + 
> getRuntimeContext().getIndexOfThisSubtask());
>                 }
>             })
>             .print();
>     env.execute();
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to