+1
very good proposal, The current configuration file structure is as follows:
flink:
deployment:
option:
$key: $value
property:
$key: $value
checkpoints:
...
watermark:
...
state:
...
restart-strategy:
...
table:
...
The parameters in flink can be placed directly under the
flink.deployment.property, Since there are too many parameters in this
part, users may not know how to set them without knowing the flink
parameter configuration (Many users do not read documentation) ,
So I extracted the most commonly used configurations, such as checkpoints,
watermarks, states, and restart strategies, and redefined the
specifications, make the parameters clearer. In fact these parameters can
be defined under flink.deployment.property,
Now change to the following configuration:
flink:
deployment:
option:
$key: $value
property:
$key: $value
#Including(checkpoints,watermark,state,restart-strategy,table)
This change has indeed brought many benefits.
Best,
Huajie Wang
Rui Fan <[email protected]> 于2022年10月6日周四 02:36写道:
> Hi all,
>
> Recently, I created issue-1739[1] to optimize the configuration inside
> flink-application.conf.
>
> BackGround:
>
> StreamPark defines many configurations that have the same meaning as Flink
> configurations, but with different configuration names.
>
> For example, Flink has the `pipeline.auto-watermark-interval`[2][3] to
> configure the interval of the automatic watermark emission. But StreamPark
> defined a new configuration is `flink.watermark.interval`.
>
> It has some problems:
> 1. The configuration is repeated, which brings learning costs to users.
> 2. Every time a new Flink version is released, StreamPark needs to be
> compatible with this configuration, which will bring a lot of repetitive
> code.
>
> So I propose to use Flink's official configuration to avoid these problems.
> It will bring some benefits:
>
> - Unified configuration, more user-friendly.
> - Reduce redundant code and reduce the cost of adapting to the new version
> of Flink (can reduce code by 25+%).
>
> Task List:
>
> According to the parameter type, I divided the parameters into three
> categories: watermark, checkpoint/state and restart-strategy, and created 3
> tasks[4][5][6] to finish them.
> I organized all the configurations mapping in the corresponding issue. Each
> configuration contains four items:
> - StreamPark Configuration Name
> - Flink Configuration
> - Flink 1.12 doc
> - Flink 1.15 doc
>
> From this information, you can know how Flink officially supports these
> parameters. You can get detailed information from issues.
>
> Configuration demo:
>
> Old configuration:
> ```
> flink:
> deployment:
> property:
> $internal.application.main:
> pipeline.name:
> yarn.application.queue:
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 2
> jobmanager.memory:
> flink.size:
> heap.size:
> jvm-metaspace.size:
> jvm-overhead.max:
> off-heap.size:
> process.size:
> taskmanager.memory:
> flink.size:
> framework.heap.size:
> framework.off-heap.size:
> managed.size:
> process.size:
> task.heap.size:
> task.off-heap.size:
> jvm-metaspace.size:
> jvm-overhead.max:
> jvm-overhead.min:
> managed.fraction: 0.4
> checkpoints:
> enable: true
> interval: 30000
> mode: EXACTLY_ONCE
> timeout: 300000
> unaligned: true
> watermark:
> interval: 10000
> # state backend
> state:
> backend: # see
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html
> value: filesystem # Special note: flink1.12 optional configuration
> ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
> ('hashmap', 'rocksdb'),
> memory: 5242880 # Effective for jobmanager, maximum memory
> async: false # Valid for (jobmanager, filesystem), whether to
> enable asynchronous
> incremental: true # Valid for rocksdb, whether to enable incremental
> # Configuration reference of rocksdb
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#rocksdb-state-backend
> # Remove the prefix of rocksdb configuration key: state.backend
> #rocksdb.block.blocksize:
> checkpoint-storage: filesystem # Special note: This parameter is only
> valid in flink 1.12+, and the state.backend.value is hashmap, optional:
> (jobmanager | filesystem)
> checkpoints.dir: file:///tmp/chkdir
> savepoints.dir: file:///tmp/chkdir
> checkpoints.num-retained: 1
> # restart strategy
> restart-strategy:
> value: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none)
> a total of 3 configurable strategies]
> fixed-delay:
> attempts: 3
> delay: 5000
> failure-rate:
> max-failures-per-interval:
> failure-rate-interval:
> delay:
> # table
> table:
> planner: blink # (blink|old|any)
> mode: streaming #(batch|streaming)
> ```
>
> New Configuration:
>
> ```
> flink:
> deployment:
> property: #@see:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
> $internal.application.main:
> pipeline.name:
> yarn.application.queue:
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 2
> jobmanager.memory:
> flink.size:
> heap.size:
> jvm-metaspace.size:
> jvm-overhead.max:
> off-heap.size:
> process.size:
> taskmanager.memory:
> flink.size:
> framework.heap.size:
> framework.off-heap.size:
> managed.size:
> process.size:
> task.heap.size:
> task.off-heap.size:
> jvm-metaspace.size:
> jvm-overhead.max:
> jvm-overhead.min:
> managed.fraction: 0.4
> pipeline:
> auto-watermark-interval: 200ms
> # checkpoint
> execution:
> checkpointing:
> mode: EXACTLY_ONCE
> interval: 30s
> timeout: 10min
> unaligned: true
> externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
> # state backend
> state:
> backend: hashmap # Special note: flink1.12 optional configuration
> ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
> ('hashmap', 'rocksdb'),
> backend.incremental: true
> checkpoint-storage: filesystem
> savepoints.dir: file:///tmp/chkdir
> checkpoints.dir: file:///tmp/chkdir
> # restart strategy
> restart-strategy: fixed-delay # Restart strategy
> [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
> restart-strategy.fixed-delay:
> attempts: 3
> delay: 5000
> restart-strategy.failure-rate:
> max-failures-per-interval:
> failure-rate-interval:
> delay:
> # table
> table:
> planner: blink # (blink|old|any)
> mode: streaming #(batch|streaming)
> ```
>
> If you have any suggestions, welcome to participate in the discussion on
> the mail list or issue, I look forward to your feedback.
>
> [1] https://github.com/apache/incubator-streampark/issues/1739
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/config.html#pipeline-auto-watermark-interval
> [3]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#pipeline-auto-watermark-interval
> [4] https://github.com/apache/incubator-streampark/issues/1742
> [5] https://github.com/apache/incubator-streampark/issues/1743
> [6] https://github.com/apache/incubator-streampark/issues/1744
>
> Best wishes
> fanrui
>