[
https://issues.apache.org/jira/browse/FLINK-34384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan updated FLINK-34384:
----------------------------
Description:
Test suggestion:
# Prepare a datastream job that all tasks throw exception directly.
## Set the parallelism to 5 or above
# Prepare some configuration options:
** restart-strategy.type : exponential-delay
** restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
# Start the cluster: ./bin/start-cluster.sh
# Run the job: ./bin/flink run -c className jarName
# Check the result
** Check whether job will be retried 7 times
** Check the exception history, the list has 7 exceptions
** Each retries except the last one can see the 5 subtasks(They are concurrent
exceptions).
!image-2024-02-06-15-05-05-331.png|width=1624,height=797!
Note: Set these options mentioned at step2 at 2 level separately
* Cluster level: set them in the config.yaml
* Job level: Set them in the code
Job level demo:
{code:java}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("restart-strategy", "exponential-delay");
conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff",
"6");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(5);
DataGeneratorSource<Long> generatorSource =
new DataGeneratorSource<>(
value -> value,
300,
RateLimiterStrategy.perSecond(10),
Types.LONG);
env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data
Generator")
.map(new RichMapFunction<Long, Long>() {
@Override
public Long map(Long value) {
throw new RuntimeException(
"Excepted testing exception, subtaskIndex: " +
getRuntimeContext().getIndexOfThisSubtask());
}
})
.print();
env.execute();
} {code}
was:
Test suggestion:
# Prepare a datastream job that all tasks throw exception directly.
## Set the parallelism to 5 or above
# Prepare some configuration options:
** restart-strategy.type : exponential-delay
** restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
# Start the cluster: ./bin/start-cluster.sh
# Run the job: ./bin/flink run -c className jarName
# Check the result
** Check whether job will be retried 7 times
** Check the exception history, the list has 2 exceptions
** Each retries except the last one can see the 5 subtasks(They are concurrent
exceptions).
!image-2024-02-06-15-05-05-331.png|width=1624,height=797!
Note: Set these options mentioned at step2 at 2 level separately
* Cluster level: set them in the config.yaml
* Job level: Set them in the code
Job level demo:
{code:java}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("restart-strategy", "exponential-delay");
conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff",
"6");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(5);
DataGeneratorSource<Long> generatorSource =
new DataGeneratorSource<>(
value -> value,
300,
RateLimiterStrategy.perSecond(10),
Types.LONG);
env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data
Generator")
.map(new RichMapFunction<Long, Long>() {
@Override
public Long map(Long value) {
throw new RuntimeException(
"Excepted testing exception, subtaskIndex: " +
getRuntimeContext().getIndexOfThisSubtask());
}
})
.print();
env.execute();
} {code}
> Release Testing: Verify FLINK-33735 Improve the exponential-delay
> restart-strategy
> -----------------------------------------------------------------------------------
>
> Key: FLINK-34384
> URL: https://issues.apache.org/jira/browse/FLINK-34384
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Affects Versions: 1.19.0
> Reporter: lincoln lee
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.19.0
>
> Attachments: image-2024-02-06-15-05-05-331.png, screenshot-1.png
>
>
> Test suggestion:
> # Prepare a datastream job that all tasks throw exception directly.
> ## Set the parallelism to 5 or above
> # Prepare some configuration options:
> ** restart-strategy.type : exponential-delay
> ** restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
> # Start the cluster: ./bin/start-cluster.sh
> # Run the job: ./bin/flink run -c className jarName
> # Check the result
> ** Check whether job will be retried 7 times
> ** Check the exception history, the list has 7 exceptions
> ** Each retries except the last one can see the 5 subtasks(They are
> concurrent exceptions).
> !image-2024-02-06-15-05-05-331.png|width=1624,height=797!
>
> Note: Set these options mentioned at step2 at 2 level separately
> * Cluster level: set them in the config.yaml
> * Job level: Set them in the code
>
> Job level demo:
> {code:java}
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> conf.setString("restart-strategy", "exponential-delay");
>
> conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff",
> "6");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> env.setParallelism(5);
> DataGeneratorSource<Long> generatorSource =
> new DataGeneratorSource<>(
> value -> value,
> 300,
> RateLimiterStrategy.perSecond(10),
> Types.LONG);
> env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data
> Generator")
> .map(new RichMapFunction<Long, Long>() {
> @Override
> public Long map(Long value) {
> throw new RuntimeException(
> "Excepted testing exception, subtaskIndex: " +
> getRuntimeContext().getIndexOfThisSubtask());
> }
> })
> .print();
> env.execute();
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)