zhuzhurk commented on code in PR #24855:
URL: https://github.com/apache/flink/pull/24855#discussion_r1618437083


##########
docs/content/docs/ops/batch/recovery_from_job_master_failure.md:
##########
@@ -0,0 +1,110 @@
+---
+title: "Recovery job progress from job master failures"
+weight: 4
+type: docs
+aliases:
+
+- /docs/ops/batch/recovery_from_job_master_failure.html
+- /docs/ops/batch/recovery_from_job_master_failure
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Batch jobs progress recovery from job master failures
+
+## Background
+
+Previously, if the JobMaster fails and is terminated, one of the following two 
situations will occur:
+
+- If high availability (HA) is disabled, the job will fail.
+- If HA is enabled, a JobMaster failover will happen and the job will be 
restarted. Streaming jobs can resume from the 
+  latest successful checkpoints. Batch jobs, however, do not have checkpoints 
and have to start over from the beginning, 
+  losing all previously made progress. This represents a significant 
regression for long-running batch jobs.
+
+To address this issue, a batch job recovery mechanism is introduced to enable 
batch jobs to recover as much progress as 
+possible after a JobMaster failover, avoiding the need to rerun tasks that 
have already been finished.
+
+To implement this feature, a JobEventStore component is introduced to record 
state change events of the JobMaster 
+(such as ExecutionGraph, OperatorCoordinator, etc.) to an external filesystem. 
During the crash and subsequent restart 
+of the JobMaster, TaskManagers will retain the intermediate result data 
produced by the job and attempt to reconnect 
+continuously. Once the JobMaster restarts, it will re-establish connections 
with TaskManagers and recover the job state 
+based on the retained intermediate results and the events previously recorded 
in the JobEventStore, thereby resuming 
+the job's execution progress.
+
+## Usage
+
+This section explains how to enable recovery of batch jobs from JobMaster 
failures, how to tune it, and how to develop 
+sources to work with batch jobs progress recovery.
+
+### How to enable batch jobs progress recovery from job master failures
+
+- Enable cluster high availability:
+
+  To enable the recovery of batch jobs from JobMaster failures, it is 
essential to first ensure that cluster
+  high availability (HA) is enabled. Flink supports HA services backed by 
ZooKeeper or Kubernetes.
+  More details of the configuration can be found in the [High 
Availability]({{< ref "docs/deployment/ha/overview#high-availability" >}}) page.
+- Configure [execution.batch.job-recovery.enabled]({{< ref 
"docs/deployment/config" >}}#execution-batch-job-recovery-enabled): true
+
+Note that currently only [Adaptive Batch Scheduler]({{< ref 
"docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler) 
+supports this feature. And Flink batch jobs will use this scheduler by default 
unless another scheduler is explicitly configured.
+
+### Optimization
+
+To enable batch jobs to recover as much progress as possible after a JobMaster 
failover, and avoid rerunning tasks 
+that have already been finished, you can configure the following options for 
optimization:
+
+- [execution.batch.job-recovery.snapshot.min-pause]({{< ref 
"docs/deployment/config" >}}#execution-batch-job-recovery-snapshot-min-pause):
+  This setting determines the minimum pause time allowed between snapshots for 
the OperatorCoordinator and ShuffleMaster.
+  This parameter could be adjusted based on the expected I/O load of your 
cluster and the tolerable amount of state regression. 
+  Reduce this interval if smaller state regressions are preferred and a higher 
I/O load is acceptable.
+- [execution.batch.job-recovery.previous-worker.recovery.timeout]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-job-recovery-previous-worker-recovery-timeout):
+  This setting determines the timeout duration allowed for Shuffle workers to 
reconnect. During the recovery process, Flink 
+  requests the retained intermediate result data information from the Shuffle 
Master. If the timeout is reached, 
+  Flink will use all the acquired intermediate result data to recover the 
state.
+- [job-event.store.write-buffer.flush-interval]({{< ref 
"docs/deployment/config" >}}#job-event-store-write-buffer-flush-interval):
+  This setting determines the flush interval for the JobEventStore's write 
buffers.
+- [job-event.store.write-buffer.size]({{< ref "docs/deployment/config" 
>}}#job-event-store-write-buffer-size): This 
+  setting determines the write buffer size in the JobEventStore. When the 
buffer is full, its contents are flushed to the external
+  filesystem.
+
+### Enable batch jobs progress recovery for sources
+
+Currently, only the new source (FLIP-27) supports progress recovery for batch 
jobs. To enable this feature, the SplitEnumerator of a new source (FLIP-27) 
need implement the SupportsBatchSnapshot interface:
+
+````
+public interface SupportsBatchSnapshot {}
+````
+
+This interface indicates that the SplitEnumerator supports taking snapshots in 
batch processing scenarios. 
+If a source's SplitEnumerator does not implement this interface and the job 
master goes through a failover, one of the following two situations will occur:
+1. If not all tasks of this source are finished, we will reset and re-run all 
these tasks.
+2. If all tasks of this source are finished, no additional action is required, 
and the job can continue to run.
+However, if any of these tasks need to be restarted at some point in the 
future (for example, due to a 
+PartitionNotFound exception), then all subtasks of this source will need to be 
reset and rerun.
+
+## Limitations
+
+- Only working with the new source (FLIP-27): Since the legacy source has been 
deprecated, this feature is only
+  working with the new source.

Review Comment:
   this feature is only working with the new source.
   -> this feature only supports the new source.



##########
docs/content.zh/docs/ops/batch/recovery_from_job_master_failure.md:
##########
@@ -75,6 +75,19 @@ JobMaster failover 后能够尽可能的恢复出错前的进度,避免重新
 - [job-event.store.write-buffer.size]({{< ref "docs/deployment/config" 
>}}#job-event-store-write-buffer-size):
   JobEventStore 写出缓冲区的大小。一旦缓冲区满了,内容将被刷新到外部文件系统。
 
+### 让 Source 支持 job master failover 后进度恢复
+
+目前,仅 new source (FLIP-27) 支持批处理作业的进度恢复。为了实现这一功能,new source 的 SplitEnumerator 
需要实现 SupportsBatchSnapshot 接口:

Review Comment:
   为了实现这一功能,new source 的 SplitEnumerator 需要实现 SupportsBatchSnapshot 接口:
   
   -> New source 的 SplitEnumerator 需要能够支持批处理场景下的 snapshotState,并且实现 
[SupportsBatchSnapshot]({{< ref "xxx" >}}#yyy) 接口,从而能够在重启后恢复到出错前的状态。



##########
docs/content.zh/docs/ops/batch/recovery_from_job_master_failure.md:
##########
@@ -75,6 +75,19 @@ JobMaster failover 后能够尽可能的恢复出错前的进度,避免重新
 - [job-event.store.write-buffer.size]({{< ref "docs/deployment/config" 
>}}#job-event-store-write-buffer-size):
   JobEventStore 写出缓冲区的大小。一旦缓冲区满了,内容将被刷新到外部文件系统。
 
+### 让 Source 支持 job master failover 后进度恢复
+
+目前,仅 new source (FLIP-27) 支持批处理作业的进度恢复。为了实现这一功能,new source 的 SplitEnumerator 
需要实现 SupportsBatchSnapshot 接口:
+
+````
+public interface SupportsBatchSnapshot {}
+````
+这个接口表示 SplitEnumerator 支持在批处理场景中进行状态快照。如果某个 source 的 SplitEnumerator 
没有实现这个接口,当 job master failover 后,

Review Comment:
   这个接口表示 SplitEnumerator 支持在批处理场景中进行状态快照。如果某个 source 的 SplitEnumerator 
没有实现这个接口,当 job master failover 后
   
   -> 否则,为了保证数据的正确性,当 job master failover 后



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to