Hi all,
Recently, I created issue-1739[1] to optimize the configuration inside
flink-application.conf.
BackGround:
StreamPark defines many configurations that have the same meaning as Flink
configurations, but with different configuration names.
For example, Flink has the `pipeline.auto-watermark-interval`[2][3] to
configure the interval of the automatic watermark emission. But StreamPark
defined a new configuration is `flink.watermark.interval`.
It has some problems:
1. The configuration is repeated, which brings learning costs to users.
2. Every time a new Flink version is released, StreamPark needs to be
compatible with this configuration, which will bring a lot of repetitive
code.
So I propose to use Flink's official configuration to avoid these problems.
It will bring some benefits:
- Unified configuration, more user-friendly.
- Reduce redundant code and reduce the cost of adapting to the new version
of Flink (can reduce code by 25+%).
Task List:
According to the parameter type, I divided the parameters into three
categories: watermark, checkpoint/state and restart-strategy, and created 3
tasks[4][5][6] to finish them.
I organized all the configurations mapping in the corresponding issue. Each
configuration contains four items:
- StreamPark Configuration Name
- Flink Configuration
- Flink 1.12 doc
- Flink 1.15 doc
>From this information, you can know how Flink officially supports these
parameters. You can get detailed information from issues.
Configuration demo:
Old configuration:
```
flink:
deployment:
property:
$internal.application.main:
pipeline.name:
yarn.application.queue:
taskmanager.numberOfTaskSlots: 1
parallelism.default: 2
jobmanager.memory:
flink.size:
heap.size:
jvm-metaspace.size:
jvm-overhead.max:
off-heap.size:
process.size:
taskmanager.memory:
flink.size:
framework.heap.size:
framework.off-heap.size:
managed.size:
process.size:
task.heap.size:
task.off-heap.size:
jvm-metaspace.size:
jvm-overhead.max:
jvm-overhead.min:
managed.fraction: 0.4
checkpoints:
enable: true
interval: 30000
mode: EXACTLY_ONCE
timeout: 300000
unaligned: true
watermark:
interval: 10000
# state backend
state:
backend: # see
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html
value: filesystem # Special note: flink1.12 optional configuration
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
('hashmap', 'rocksdb'),
memory: 5242880 # Effective for jobmanager, maximum memory
async: false # Valid for (jobmanager, filesystem), whether to
enable asynchronous
incremental: true # Valid for rocksdb, whether to enable incremental
# Configuration reference of rocksdb
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#rocksdb-state-backend
# Remove the prefix of rocksdb configuration key: state.backend
#rocksdb.block.blocksize:
checkpoint-storage: filesystem # Special note: This parameter is only
valid in flink 1.12+, and the state.backend.value is hashmap, optional:
(jobmanager | filesystem)
checkpoints.dir: file:///tmp/chkdir
savepoints.dir: file:///tmp/chkdir
checkpoints.num-retained: 1
# restart strategy
restart-strategy:
value: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none)
a total of 3 configurable strategies]
fixed-delay:
attempts: 3
delay: 5000
failure-rate:
max-failures-per-interval:
failure-rate-interval:
delay:
# table
table:
planner: blink # (blink|old|any)
mode: streaming #(batch|streaming)
```
New Configuration:
```
flink:
deployment:
property: #@see:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
$internal.application.main:
pipeline.name:
yarn.application.queue:
taskmanager.numberOfTaskSlots: 1
parallelism.default: 2
jobmanager.memory:
flink.size:
heap.size:
jvm-metaspace.size:
jvm-overhead.max:
off-heap.size:
process.size:
taskmanager.memory:
flink.size:
framework.heap.size:
framework.off-heap.size:
managed.size:
process.size:
task.heap.size:
task.off-heap.size:
jvm-metaspace.size:
jvm-overhead.max:
jvm-overhead.min:
managed.fraction: 0.4
pipeline:
auto-watermark-interval: 200ms
# checkpoint
execution:
checkpointing:
mode: EXACTLY_ONCE
interval: 30s
timeout: 10min
unaligned: true
externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# state backend
state:
backend: hashmap # Special note: flink1.12 optional configuration
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
('hashmap', 'rocksdb'),
backend.incremental: true
checkpoint-storage: filesystem
savepoints.dir: file:///tmp/chkdir
checkpoints.dir: file:///tmp/chkdir
# restart strategy
restart-strategy: fixed-delay # Restart strategy
[(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
restart-strategy.fixed-delay:
attempts: 3
delay: 5000
restart-strategy.failure-rate:
max-failures-per-interval:
failure-rate-interval:
delay:
# table
table:
planner: blink # (blink|old|any)
mode: streaming #(batch|streaming)
```
If you have any suggestions, welcome to participate in the discussion on
the mail list or issue, I look forward to your feedback.
[1] https://github.com/apache/incubator-streampark/issues/1739
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/config.html#pipeline-auto-watermark-interval
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#pipeline-auto-watermark-interval
[4] https://github.com/apache/incubator-streampark/issues/1742
[5] https://github.com/apache/incubator-streampark/issues/1743
[6] https://github.com/apache/incubator-streampark/issues/1744
Best wishes
fanrui