Hi all,

Recently, I created issue-1739[1] to optimize the configuration inside
flink-application.conf.

BackGround:

StreamPark defines many configurations that have the same meaning as Flink
configurations, but with different configuration names.

For example, Flink has the `pipeline.auto-watermark-interval`[2][3] to
configure the interval of the automatic watermark emission. But StreamPark
defined a new configuration is `flink.watermark.interval`.

It has some problems:
1. The configuration is repeated, which brings learning costs to users.
2. Every time a new Flink version is released, StreamPark needs to be
compatible with this configuration, which will bring a lot of repetitive
code.

So I propose to use Flink's official configuration to avoid these problems.
It will bring some benefits:

- Unified configuration, more user-friendly.
- Reduce redundant code and reduce the cost of adapting to the new version
of Flink (can reduce code by 25+%).

Task List:

According to the parameter type, I divided the parameters into three
categories: watermark, checkpoint/state and restart-strategy, and created 3
tasks[4][5][6] to finish them.
I organized all the configurations mapping in the corresponding issue. Each
configuration contains four items:
- StreamPark Configuration Name
- Flink Configuration
- Flink 1.12 doc
- Flink 1.15 doc

>From this information, you can know how Flink officially supports these
parameters. You can get detailed information from issues.

Configuration demo:

Old configuration:
```
flink:
  deployment:
    property:
      $internal.application.main:
      pipeline.name:
      yarn.application.queue:
      taskmanager.numberOfTaskSlots: 1
      parallelism.default: 2
      jobmanager.memory:
        flink.size:
        heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        off-heap.size:
        process.size:
      taskmanager.memory:
        flink.size:
        framework.heap.size:
        framework.off-heap.size:
        managed.size:
        process.size:
        task.heap.size:
        task.off-heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        jvm-overhead.min:
        managed.fraction: 0.4
  checkpoints:
    enable: true
    interval: 30000
    mode: EXACTLY_ONCE
    timeout: 300000
    unaligned: true
  watermark:
    interval: 10000
  # state backend
  state:
    backend: # see
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html
      value: filesystem # Special note: flink1.12 optional configuration
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
('hashmap', 'rocksdb'),
      memory: 5242880 # Effective for jobmanager, maximum memory
      async: false    # Valid for (jobmanager, filesystem), whether to
enable asynchronous
      incremental: true # Valid for rocksdb, whether to enable incremental
      # Configuration reference of rocksdb
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#rocksdb-state-backend
      # Remove the prefix of rocksdb configuration key: state.backend
      #rocksdb.block.blocksize:
    checkpoint-storage: filesystem # Special note: This parameter is only
valid in flink 1.12+, and the state.backend.value is hashmap, optional:
(jobmanager | filesystem)
    checkpoints.dir: file:///tmp/chkdir
    savepoints.dir: file:///tmp/chkdir
    checkpoints.num-retained: 1
  # restart strategy
  restart-strategy:
    value: fixed-delay  # Restart strategy [(fixed-delay|failure-rate|none)
a total of 3 configurable strategies]
    fixed-delay:
      attempts: 3
      delay: 5000
    failure-rate:
      max-failures-per-interval:
      failure-rate-interval:
      delay:
  # table
  table:
    planner: blink # (blink|old|any)
    mode: streaming #(batch|streaming)
```

New Configuration:

```
flink:
  deployment:
    property: #@see:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
      $internal.application.main:
      pipeline.name:
      yarn.application.queue:
      taskmanager.numberOfTaskSlots: 1
      parallelism.default: 2
      jobmanager.memory:
        flink.size:
        heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        off-heap.size:
        process.size:
      taskmanager.memory:
        flink.size:
        framework.heap.size:
        framework.off-heap.size:
        managed.size:
        process.size:
        task.heap.size:
        task.off-heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        jvm-overhead.min:
        managed.fraction: 0.4
      pipeline:
        auto-watermark-interval: 200ms
      # checkpoint
      execution:
        checkpointing:
          mode: EXACTLY_ONCE
          interval: 30s
          timeout: 10min
          unaligned: true
          externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
      # state backend
      state:
        backend: hashmap # Special note: flink1.12 optional configuration
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
('hashmap', 'rocksdb'),
        backend.incremental: true
        checkpoint-storage: filesystem
        savepoints.dir: file:///tmp/chkdir
        checkpoints.dir: file:///tmp/chkdir
      # restart strategy
      restart-strategy: fixed-delay  # Restart strategy
[(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
      restart-strategy.fixed-delay:
        attempts: 3
        delay: 5000
      restart-strategy.failure-rate:
        max-failures-per-interval:
        failure-rate-interval:
        delay:
  # table
  table:
    planner: blink # (blink|old|any)
    mode: streaming #(batch|streaming)
```

If you have any suggestions, welcome to participate in the discussion on
the mail list or issue, I look forward to your feedback.

[1] https://github.com/apache/incubator-streampark/issues/1739
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/config.html#pipeline-auto-watermark-interval
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#pipeline-auto-watermark-interval
[4] https://github.com/apache/incubator-streampark/issues/1742
[5] https://github.com/apache/incubator-streampark/issues/1743
[6] https://github.com/apache/incubator-streampark/issues/1744

Best wishes
fanrui

Reply via email to