zentol commented on a change in pull request #15355:
URL: https://github.com/apache/flink/pull/15355#discussion_r600245551



##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -100,13 +110,40 @@ Since Reactive Mode is a new, experimental feature, not 
all features supported b
 - **Deployment is only supported as a standalone application deployment**. 
Active resource providers (such as native Kubernetes, YARN or Mesos) are 
explicitly not supported. Standalone session clusters are not supported either. 
The application deployment is limited to single job applications. 
 
   The only supported deployment options are [Standalone in Application 
Mode]({{< ref "docs/deployment/resource-providers/standalone/overview" 
>}}#application-mode) ([described](#getting-started) on this page), [Docker in 
Application Mode]({{< ref 
"docs/deployment/resource-providers/standalone/docker" 
>}}#application-mode-on-docker) and [Standalone Kubernetes Application 
Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" 
>}}#deploy-application-cluster).
-- **Streaming jobs only**: The first version of Reactive Mode runs with 
streaming jobs only. When submitting a batch job, then the default scheduler 
will be used.
-- **No support for [local recovery]({{< ref 
"docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery 
is a feature that schedules tasks to machines so that the state on that machine 
gets re-used if possible. The lack of this feature means that Reactive Mode 
will always need to download the entire state from the checkpoint storage.
-- **No support for local failover**: Local failover means that the scheduler 
is able to restart parts ("regions" in Flink's internals) of a failed job, 
instead of the entire job. This limitation impacts only recovery time of 
embarrassingly parallel jobs: Flink's default scheduler can restart failed 
parts, while Reactive Mode will restart the entire job.
-- **Limited integration with Flink's Web UI**: Reactive Mode allows that a 
job's parallelism can change over its lifetime. The web UI only shows the 
current parallelism the job.
-- **Limited Job metrics**: With the exception of `numRestarts` all 
[availability]({{< ref "docs/ops/metrics" >}}#availability) and 
[checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the 
`Job` scope are not working correctly.
 
+The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive 
Mode.
+
+
+## Adaptive Scheduler
+
+{{< hint danger >}}
+Using Adaptive Scheduler directly (not through Reactive Mode) is only advised 
for advanced users.
+{{< /hint >}}
+
+Adaptive Scheduler is a scheduler that can adjust the parallelism of a job 
based on the available slots. On start up, it requests the number of slots 
needed based on the parallelisms configured by the user in the streaming job. 
If the number of slots offered is lower than the requested slots, Adaptive 
Scheduler will reduce the parallelism so that it can start executing the job 
(or fail if insufficient slots are available). In Reactive Mode (see above) the 
parallelism requested is conceptually set to infinity, letting the job always 
use as many resources as possible. You can also use Adaptive Scheduler without 
Reactive Mode, but there are some practical limitations:

Review comment:
       ```suggestion
   The Adaptive Scheduler can adjust the parallelism of a job based on 
available slots. It will automatically reduce the parallelism if not enough 
slots are available to run the job with the originally configured parallelism; 
be it due to not enough resources being available at the time of submission, or 
TaskManager outages during the job execution. If new slots become available the 
job will be scaled up again, up to the configured parallelism.
   In Reactive Mode (see above) the configured parallelism is ignored and 
treated as if it was set to infinity, letting the job always use as many 
resources as possible.
   You can also use Adaptive Scheduler without Reactive Mode, but there are 
some practical limitations:
   ```
   I think this goes too far into implementation details of the declarative 
slot protocol.

##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -100,13 +110,40 @@ Since Reactive Mode is a new, experimental feature, not 
all features supported b
 - **Deployment is only supported as a standalone application deployment**. 
Active resource providers (such as native Kubernetes, YARN or Mesos) are 
explicitly not supported. Standalone session clusters are not supported either. 
The application deployment is limited to single job applications. 
 
   The only supported deployment options are [Standalone in Application 
Mode]({{< ref "docs/deployment/resource-providers/standalone/overview" 
>}}#application-mode) ([described](#getting-started) on this page), [Docker in 
Application Mode]({{< ref 
"docs/deployment/resource-providers/standalone/docker" 
>}}#application-mode-on-docker) and [Standalone Kubernetes Application 
Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" 
>}}#deploy-application-cluster).
-- **Streaming jobs only**: The first version of Reactive Mode runs with 
streaming jobs only. When submitting a batch job, then the default scheduler 
will be used.
-- **No support for [local recovery]({{< ref 
"docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery 
is a feature that schedules tasks to machines so that the state on that machine 
gets re-used if possible. The lack of this feature means that Reactive Mode 
will always need to download the entire state from the checkpoint storage.
-- **No support for local failover**: Local failover means that the scheduler 
is able to restart parts ("regions" in Flink's internals) of a failed job, 
instead of the entire job. This limitation impacts only recovery time of 
embarrassingly parallel jobs: Flink's default scheduler can restart failed 
parts, while Reactive Mode will restart the entire job.
-- **Limited integration with Flink's Web UI**: Reactive Mode allows that a 
job's parallelism can change over its lifetime. The web UI only shows the 
current parallelism the job.
-- **Limited Job metrics**: With the exception of `numRestarts` all 
[availability]({{< ref "docs/ops/metrics" >}}#availability) and 
[checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the 
`Job` scope are not working correctly.
 
+The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive 
Mode.
+
+
+## Adaptive Scheduler
+
+{{< hint danger >}}
+Using Adaptive Scheduler directly (not through Reactive Mode) is only advised 
for advanced users.
+{{< /hint >}}
+
+Adaptive Scheduler is a scheduler that can adjust the parallelism of a job 
based on the available slots. On start up, it requests the number of slots 
needed based on the parallelisms configured by the user in the streaming job. 
If the number of slots offered is lower than the requested slots, Adaptive 
Scheduler will reduce the parallelism so that it can start executing the job 
(or fail if insufficient slots are available). In Reactive Mode (see above) the 
parallelism requested is conceptually set to infinity, letting the job always 
use as many resources as possible. You can also use Adaptive Scheduler without 
Reactive Mode, but there are some practical limitations:
+- If you are using Adaptive Scheduler on a session cluster, there are no 
guarantees regarding the distribution of slots between multiple running jobs in 
the same session.
+- An active resource manager (native Kubernetes, YARN, Mesos) will request 
TaskManagers until the parallelism requested by the job is fulfilled, 
potentially allocating a lot of resources.

Review comment:
       How is this a limitation of the adaptive scheduler? The default 
scheduler has the same behavior.

##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -100,13 +110,40 @@ Since Reactive Mode is a new, experimental feature, not 
all features supported b
 - **Deployment is only supported as a standalone application deployment**. 
Active resource providers (such as native Kubernetes, YARN or Mesos) are 
explicitly not supported. Standalone session clusters are not supported either. 
The application deployment is limited to single job applications. 
 
   The only supported deployment options are [Standalone in Application 
Mode]({{< ref "docs/deployment/resource-providers/standalone/overview" 
>}}#application-mode) ([described](#getting-started) on this page), [Docker in 
Application Mode]({{< ref 
"docs/deployment/resource-providers/standalone/docker" 
>}}#application-mode-on-docker) and [Standalone Kubernetes Application 
Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" 
>}}#deploy-application-cluster).
-- **Streaming jobs only**: The first version of Reactive Mode runs with 
streaming jobs only. When submitting a batch job, then the default scheduler 
will be used.
-- **No support for [local recovery]({{< ref 
"docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery 
is a feature that schedules tasks to machines so that the state on that machine 
gets re-used if possible. The lack of this feature means that Reactive Mode 
will always need to download the entire state from the checkpoint storage.
-- **No support for local failover**: Local failover means that the scheduler 
is able to restart parts ("regions" in Flink's internals) of a failed job, 
instead of the entire job. This limitation impacts only recovery time of 
embarrassingly parallel jobs: Flink's default scheduler can restart failed 
parts, while Reactive Mode will restart the entire job.
-- **Limited integration with Flink's Web UI**: Reactive Mode allows that a 
job's parallelism can change over its lifetime. The web UI only shows the 
current parallelism the job.
-- **Limited Job metrics**: With the exception of `numRestarts` all 
[availability]({{< ref "docs/ops/metrics" >}}#availability) and 
[checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the 
`Job` scope are not working correctly.
 
+The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive 
Mode.
+
+
+## Adaptive Scheduler
+
+{{< hint danger >}}
+Using Adaptive Scheduler directly (not through Reactive Mode) is only advised 
for advanced users.
+{{< /hint >}}
+
+Adaptive Scheduler is a scheduler that can adjust the parallelism of a job 
based on the available slots. On start up, it requests the number of slots 
needed based on the parallelisms configured by the user in the streaming job. 
If the number of slots offered is lower than the requested slots, Adaptive 
Scheduler will reduce the parallelism so that it can start executing the job 
(or fail if insufficient slots are available). In Reactive Mode (see above) the 
parallelism requested is conceptually set to infinity, letting the job always 
use as many resources as possible. You can also use Adaptive Scheduler without 
Reactive Mode, but there are some practical limitations:
+- If you are using Adaptive Scheduler on a session cluster, there are no 
guarantees regarding the distribution of slots between multiple running jobs in 
the same session.
+- An active resource manager (native Kubernetes, YARN, Mesos) will request 
TaskManagers until the parallelism requested by the job is fulfilled, 
potentially allocating a lot of resources.
 
+One benefit of Adpative Scheduler over the default scheduler is that it can 
handle TaskManager losses gracefully, since it would just scale down in these 
cases.
+
+### Usage
+
+The following configuration parameters need to be set:
+
+- `jobmanager.scheduler: adaptive`: Change from the default scheduler to 
adaptive scheduler
+- `cluster.declarative-resource-management.enabled` Declarative resource 
management must be enabled (enabled by default).
+
+Depending on your usage scenario, we also recommend adjusting the parallelism 
of the job you are submitting to the adaptive scheduler. The parallelism 
configured determines the number of slots Adaptive Scheduler will request.

Review comment:
       In what circumstances would a 1.12 user that wants to use the adaptive 
scheduler adjust the parallelism, and why? 

##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -88,10 +88,20 @@ If you manually set a parallelism in your job for 
individual operators or the en
 
 Note that such a high maxParallelism might affect performance of the job, 
since more internal structures are needed to maintain [some internal 
structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html)
 of Flink.
 
+When enabling Reactive Mode, the 
`jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will 
default to `-1`. This means that the JobManager will run forever waiting for 
sufficient resources.
+If you want the JobManager to stop after a certain time without enough 
TaskManagers to run the job, configure 
`jobmanager.adaptive-scheduler.resource-wait-timeout`.
+
+With Reactive Mode enabled, the 
`jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration 
key will default to `0`: Flink will start runnning the job, as soon as there 
are sufficient resources available.
+In scenarios where TaskManagers are not connecting at the same time, but 
slowly one after another, this behavior leads to a job restart whenever a 
TaskManager connects. Increase this configuration value if you want to wait for 
the resources to stabilize before scheduling the job.
+
 #### Recommendations
 
 - **Configure periodic checkpointing for stateful jobs**: Reactive mode 
restores from the latest completed checkpoint on a rescale event. If no 
periodic checkpointing is enabled, your program will loose its state. 
Checkpointing also configures a **restart strategy**. Reactive mode will 
respect the configured restarting strategy: If no restarting strategy is 
configured, reactive mode will fail your job, instead of scaling it.
 
+- Downscaling in Reactive Mode might cause longer stalls in your processing 
because Flink waits for the heartbeat between JobManager and the stopped 
TaskManager(s) to time-out. You will see that your Flink job is stuck in the 
failing state for roughly 50 seconds before redeploying your job with a lower 
parallelism.
+
+  The default timeout is configured to 50 seconds. Adjust the 
[`heartbeat.timeout`]({{< ref "docs/deployment/config">}}#heartbeat-timeout) 
configuration to a lower value, if your infrastructure permits this. Setting a 
low heartbeat timeout can lead to failures if a TaskManager fails to respond to 
a heartbeat, for example due to a network congestion or a long garbage 
collection pause. Note that the [`heartbeat.interval`]({{< ref 
"docs/deployment/config">}}#heartbeat-interval) always needs to be lower than 
the timeout.

Review comment:
       ```suggestion
   The default timeout is configured to 50 seconds. Adjust the 
[`heartbeat.timeout`]({{< ref "docs/deployment/config">}}#heartbeat-timeout) 
configuration to a lower value, if your infrastructure permits this. Setting a 
low heartbeat timeout can lead to failures if a TaskManager fails to respond to 
a heartbeat, for example due to a network congestion or a long garbage 
collection pause. Note that the [`heartbeat.interval`]({{< ref 
"docs/deployment/config">}}#heartbeat-interval) always needs to be lower than 
the timeout.
   ```
   Or is this intentional?

##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -88,10 +88,20 @@ If you manually set a parallelism in your job for 
individual operators or the en
 
 Note that such a high maxParallelism might affect performance of the job, 
since more internal structures are needed to maintain [some internal 
structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html)
 of Flink.
 
+When enabling Reactive Mode, the 
`jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will 
default to `-1`. This means that the JobManager will run forever waiting for 
sufficient resources.
+If you want the JobManager to stop after a certain time without enough 
TaskManagers to run the job, configure 
`jobmanager.adaptive-scheduler.resource-wait-timeout`.
+
+With Reactive Mode enabled, the 
`jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration 
key will default to `0`: Flink will start runnning the job, as soon as there 
are sufficient resources available.
+In scenarios where TaskManagers are not connecting at the same time, but 
slowly one after another, this behavior leads to a job restart whenever a 
TaskManager connects. Increase this configuration value if you want to wait for 
the resources to stabilize before scheduling the job.
+
 #### Recommendations
 
 - **Configure periodic checkpointing for stateful jobs**: Reactive mode 
restores from the latest completed checkpoint on a rescale event. If no 
periodic checkpointing is enabled, your program will loose its state. 
Checkpointing also configures a **restart strategy**. Reactive mode will 
respect the configured restarting strategy: If no restarting strategy is 
configured, reactive mode will fail your job, instead of scaling it.
 
+- Downscaling in Reactive Mode might cause longer stalls in your processing 
because Flink waits for the heartbeat between JobManager and the stopped 
TaskManager(s) to time-out. You will see that your Flink job is stuck in the 
failing state for roughly 50 seconds before redeploying your job with a lower 
parallelism.

Review comment:
       ```suggestion
   - Downscaling in Reactive Mode might cause longer stalls in your processing 
because Flink waits for the heartbeat between JobManager and the stopped 
TaskManager(s) to time out. You will see that your Flink job is stuck in the 
failing state for roughly 50 seconds before redeploying your job with a lower 
parallelism.
   ```
   
   Is `stuck in the failing state` referring to the actual job status in Flink? 
 (asking because the job will never officially reach a FAILING state, because 
from there the only path is FINISHED. It will be exposed as RUNNING until the 
restart is triggered).

##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -88,10 +88,20 @@ If you manually set a parallelism in your job for 
individual operators or the en
 
 Note that such a high maxParallelism might affect performance of the job, 
since more internal structures are needed to maintain [some internal 
structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html)
 of Flink.
 
+When enabling Reactive Mode, the 
`jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will 
default to `-1`. This means that the JobManager will run forever waiting for 
sufficient resources.
+If you want the JobManager to stop after a certain time without enough 
TaskManagers to run the job, configure 
`jobmanager.adaptive-scheduler.resource-wait-timeout`.
+
+With Reactive Mode enabled, the 
`jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration 
key will default to `0`: Flink will start runnning the job, as soon as there 
are sufficient resources available.
+In scenarios where TaskManagers are not connecting at the same time, but 
slowly one after another, this behavior leads to a job restart whenever a 
TaskManager connects. Increase this configuration value if you want to wait for 
the resources to stabilize before scheduling the job.

Review comment:
       Unrelated to this ticket, but these both sound like undesirable 
behaviors.
   
   If we're fine with downscaling causing 30+seconds of downtime, then why can 
we not wait a few seconds for the stabilization?
   Why is reactive mode now this special case where a cluster may sit idling 
indefinitely?
   
   It seems a bit like we're optimizing the defaults for demos, and not actual 
production usage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to