This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e27f8a3a0783d551457a2f424b01267bd1c8c2c2 Author: JunRuiLee <[email protected]> AuthorDate: Mon Nov 20 20:17:22 2023 +0800 [FLINK-33669][doc] Update the usage of configuring state backend in docs. --- .../docs/deployment/filesystems/azure.md | 7 ++- docs/content.zh/docs/deployment/filesystems/oss.md | 7 ++- docs/content.zh/docs/deployment/filesystems/s3.md | 7 ++- .../datastream/fault-tolerance/checkpointing.md | 7 ++- .../datastream/fault-tolerance/state_backends.md | 10 +++-- docs/content.zh/docs/ops/state/checkpoints.md | 6 ++- docs/content.zh/docs/ops/state/state_backends.md | 50 +++++++++++++--------- .../datastream/fault-tolerance/state_backends.md | 10 +++-- docs/content/docs/ops/state/state_backends.md | 50 +++++++++++++--------- 9 files changed, 98 insertions(+), 56 deletions(-) diff --git a/docs/content.zh/docs/deployment/filesystems/azure.md b/docs/content.zh/docs/deployment/filesystems/azure.md index 42a0d09ee96..1482a302dac 100644 --- a/docs/content.zh/docs/deployment/filesystems/azure.md +++ b/docs/content.zh/docs/deployment/filesystems/azure.md @@ -63,8 +63,11 @@ env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.window // 写入 Azure Blob 存储 stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"); -// 将 Azure Blob 存储用作 FsStatebackend -env.setStateBackend(new FsStateBackend("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")); +// 将 Azure Blob 存储用作 checkpoint storage +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"); +env.configure(config); ``` ## Shaded Hadoop Azure Blob 存储文件系统 diff --git a/docs/content.zh/docs/deployment/filesystems/oss.md b/docs/content.zh/docs/deployment/filesystems/oss.md index e1ca862276c..5a210ad9954 100644 --- a/docs/content.zh/docs/deployment/filesystems/oss.md +++ b/docs/content.zh/docs/deployment/filesystems/oss.md @@ -48,8 +48,11 @@ env.readTextFile("oss://<your-bucket>/<object-name>"); // 写入 OSS bucket stream.writeAsText("oss://<your-bucket>/<object-name>"); -// 将 OSS 用作 FsStatebackend -env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>")); +// 将 OSS 用作 checkpoint storage +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "oss://<your-bucket>/<object-name>"); +env.configure(config); ``` ### Shaded Hadoop OSS 文件系统 diff --git a/docs/content.zh/docs/deployment/filesystems/s3.md b/docs/content.zh/docs/deployment/filesystems/s3.md index 5bb316b038b..5f9a895c786 100644 --- a/docs/content.zh/docs/deployment/filesystems/s3.md +++ b/docs/content.zh/docs/deployment/filesystems/s3.md @@ -46,8 +46,11 @@ env.readTextFile("s3://<bucket>/<endpoint>"); // 写入 S3 bucket stream.writeAsText("s3://<bucket>/<endpoint>"); -// 使用 S3 作为 FsStatebackend -env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>")); +// 使用 S3 作为 checkpoint storage +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "s3://<your-bucket>/<endpoint>"); +env.configure(config); ``` 注意这些例子并*不详尽*,S3 同样可以用在其他场景,包括 [JobManager 高可用配置]({{< ref "docs/deployment/ha/overview" >}}) 或 [RocksDBStateBackend]({{< ref "docs/ops/state/state_backends" >}}#the-rocksdbstatebackend),以及所有 Flink 需要使用文件系统 URI 的位置。 diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md index 17ade58ae72..5f10720e592 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -196,7 +196,12 @@ Flink 的 [checkpointing 机制]({{< ref "docs/learn-flink/fault_tolerance" >}}) Checkpoint 存储在哪里取决于所配置的 **State Backend**(比如 JobManager memory、 file system、 database)。 默认情况下,状态是保持在 TaskManagers 的内存中,checkpoint 保存在 JobManager 的内存中。为了合适地持久化大体量状态, -Flink 支持各种各样的途径去存储 checkpoint 状态到其他的 state backends 上。通过 `StreamExecutionEnvironment.setStateBackend(…)` 来配置所选的 state backends。 +Flink 支持各种各样的途径去存储 checkpoint 状态到其他的 state backends 上。可以通过如下代码块来配置: +```java +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); +env.configure(config); +``` 阅读 [state backends]({{< ref "docs/ops/state/state_backends" >}}) 来查看在 job 范围和集群范围上可用的 state backends 与选项的更多细节。 diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state_backends.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/state_backends.md index 68821981731..9234fe78a89 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state_backends.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state_backends.md @@ -39,8 +39,9 @@ Flink 提供了多种 state backends,它用于指定状态的存储方式和 {{< tabs "03941da4-5c40-4bb8-97ce-dd14c08bb9a9" >}} {{< tab "Java" >}} ```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStateBackend(...); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); +env.configure(config); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -51,8 +52,9 @@ env.setStateBackend(...) {{< /tab >}} {{< tab "Python" >}} ```python -env = StreamExecutionEnvironment.get_execution_environment() -env.set_state_backend(...) +config = Configuration() +config.set_string('state.backend.type', 'rocksdb') +env = StreamExecutionEnvironment.get_execution_environment(config) ``` {{< /tab >}} {{< /tabs >}} diff --git a/docs/content.zh/docs/ops/state/checkpoints.md b/docs/content.zh/docs/ops/state/checkpoints.md index 616a272953f..13a719cb40f 100644 --- a/docs/content.zh/docs/ops/state/checkpoints.md +++ b/docs/content.zh/docs/ops/state/checkpoints.md @@ -131,7 +131,11 @@ state.checkpoints.dir: hdfs:///checkpoints/ #### 创建 state backend 对单个作业进行配置 ```java -env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/")); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///checkpoints-data/"); +env.configure(config); ``` ### 从保留的 checkpoint 中恢复状态 diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md index cfbae39afbd..bca2bc7af5b 100644 --- a/docs/content.zh/docs/ops/state/state_backends.md +++ b/docs/content.zh/docs/ops/state/state_backends.md @@ -126,8 +126,9 @@ EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Bac {{< tabs "c8226811-7dea-4c75-8f56-44ee2f40a682" >}} {{< tab "Java" >}} ```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStateBackend(new HashMapStateBackend()); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); +env.configure(config); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -138,8 +139,9 @@ env.setStateBackend(new HashMapStateBackend()) {{< /tab >}} {{< tab "Python" >}} ```python -env = StreamExecutionEnvironment.get_execution_environment() -env.set_state_backend(HashMapStateBackend()) +config = Configuration() +config.set_string('state.backend.type', 'hashmap') +env = StreamExecutionEnvironment.get_execution_environment(config) ``` {{< /tab >}} {{< /tabs >}} @@ -497,9 +499,10 @@ state.checkpoint-storage: jobmanager {{< tabs "memorystatebackendmigration" >}} {{< tab "Java" >}} ```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStateBackend(new HashMapStateBackend()); -env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager"); +env.configure(config); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -511,8 +514,9 @@ env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage) {{< /tab >}} {{< tab "Python" >}} ```python -env = StreamExecutionEnvironment.get_execution_environment() -env.set_state_backend(HashMapStateBackend()) +config = Configuration() +config.set_string('state.backend.type', 'hashmap') +env = StreamExecutionEnvironment.get_execution_environment(config) env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage()) ``` {{< /tab >}} @@ -538,9 +542,11 @@ state.checkpoint-storage: filesystem {{< tabs "fsstatebackendmigration" >}} {{< tab "Java" >}} ```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStateBackend(new HashMapStateBackend()); -env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir"); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///checkpoint-dir"); +env.configure(config); // Advanced FsStateBackend configurations, such as write buffer size @@ -562,8 +568,9 @@ env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(" {{< /tab >}} {{< tab "Python" >}} ```python -env = StreamExecutionEnvironment.get_execution_environment() -env.set_state_backend(HashMapStateBackend()) +config = Configuration() +config.set_string('state.backend.type', 'hashmap') +env = StreamExecutionEnvironment.get_execution_environment(config) env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir") @@ -594,9 +601,11 @@ state.checkpoint-storage: filesystem {{< tabs "rocksdbstatebackendmigration" >}} {{< tab "Java" >}} ```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStateBackend(new EmbeddedRocksDBStateBackend()); -env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir"); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///checkpoint-dir"); +env.configure(config); // If you manually passed FsStateBackend into the RocksDBStateBackend constructor @@ -606,7 +615,7 @@ env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(" ``` {{< /tab >}} {{< tab "Scala" >}} -```java +```scala val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new EmbeddedRocksDBStateBackend) env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir") @@ -620,8 +629,9 @@ env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(" {{< /tab >}} {{< tab "Python" >}} ```python -env = StreamExecutionEnvironment.get_execution_environment() -env.set_state_backend(EmbeddedRocksDBStateBackend()) +config = Configuration() +config.set_string('state.backend.type', 'hashmap') +env = StreamExecutionEnvironment.get_execution_environment(config) env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir") diff --git a/docs/content/docs/dev/datastream/fault-tolerance/state_backends.md b/docs/content/docs/dev/datastream/fault-tolerance/state_backends.md index 61eccadb42e..d4dc3bda82c 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/state_backends.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/state_backends.md @@ -37,8 +37,9 @@ For more information about the available state backends, their advantages, limit {{< tabs "65b41d30-c7c8-4b6b-b31b-7ff99b4d341d" >}} {{< tab "Java" >}} ```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStateBackend(...); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); +env.configure(config); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -49,8 +50,9 @@ env.setStateBackend(...) {{< /tab >}} {{< tab "Python" >}} ```python -env = StreamExecutionEnvironment.get_execution_environment() -env.set_state_backend(...) +config = Configuration() +config.set_string('state.backend.type', 'hashmap') +env = StreamExecutionEnvironment.get_execution_environment(config) ``` {{< /tab >}} {{< /tabs >}} diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md index 28c7b02fbe6..5ccd28d8e55 100644 --- a/docs/content/docs/ops/state/state_backends.md +++ b/docs/content/docs/ops/state/state_backends.md @@ -119,8 +119,9 @@ The per-job state backend is set on the `StreamExecutionEnvironment` of the job, {{< tabs "6e6f1fd6-fcc6-4af4-929f-97dc7d639df4" >}} {{< tab "Java" >}} ```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStateBackend(new HashMapStateBackend()); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); +env.configure(config); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -131,8 +132,9 @@ env.setStateBackend(new HashMapStateBackend()) {{< /tab >}} {{< tab "Python" >}} ```python -env = StreamExecutionEnvironment.get_execution_environment() -env.set_state_backend(HashMapStateBackend()) +config = Configuration() +config.set_string('state.backend.type', 'hashmap') +env = StreamExecutionEnvironment.get_execution_environment(config) ``` {{< /tab >}} {{< /tabs >}} @@ -487,9 +489,10 @@ state.checkpoint-storage: jobmanager {{< tabs "memorystatebackendmigration" >}} {{< tab "Java" >}} ```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStateBackend(new HashMapStateBackend()); -env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager"); +env.configure(config); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -501,8 +504,9 @@ env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage) {{< /tab >}} {{< tab "Python" >}} ```python -env = StreamExecutionEnvironment.get_execution_environment() -env.set_state_backend(HashMapStateBackend()) +config = Configuration() +config.set_string('state.backend.type', 'hashmap') +env = StreamExecutionEnvironment.get_execution_environment(config) env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage()) ``` {{< /tab >}} @@ -528,9 +532,11 @@ state.checkpoint-storage: filesystem {{< tabs "fsstatebackendbackendmigration" >}} {{< tab "Java" >}} ```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStateBackend(new HashMapStateBackend()); -env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir"); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///checkpoint-dir"); +env.configure(config); // Advanced FsStateBackend configurations, such as write buffer size @@ -552,8 +558,9 @@ env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(" {{< /tab >}} {{< tab "Python" >}} ```python -env = StreamExecutionEnvironment.get_execution_environment() -env.set_state_backend(HashMapStateBackend()) +config = Configuration() +config.set_string('state.backend.type', 'hashmap') +env = StreamExecutionEnvironment.get_execution_environment(config) env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir") @@ -584,9 +591,11 @@ state.checkpoint-storage: filesystem {{< tabs "rocksdbstatebackendmigration" >}} {{< tab "Java" >}} ```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStateBackend(new EmbeddedRocksDBStateBackend()); -env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir"); +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///checkpoint-dir"); +env.configure(config); // If you manually passed FsStateBackend into the RocksDBStateBackend constructor @@ -596,7 +605,7 @@ env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(" ``` {{< /tab >}} {{< tab "Scala" >}} -```java +```scala val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new EmbeddedRocksDBStateBackend) env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir") @@ -610,8 +619,9 @@ env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(" {{< /tab >}} {{< tab "Python" >}} ```python -env = StreamExecutionEnvironment.get_execution_environment() -env.set_state_backend(EmbeddedRocksDBStateBackend()) +config = Configuration() +config.set_string('state.backend.type', 'rocksdb') +env = StreamExecutionEnvironment.get_execution_environment(config) env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir")
