zentol commented on a change in pull request #15355:
URL: https://github.com/apache/flink/pull/15355#discussion_r600245551
##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -100,13 +110,40 @@ Since Reactive Mode is a new, experimental feature, not
all features supported b
- **Deployment is only supported as a standalone application deployment**.
Active resource providers (such as native Kubernetes, YARN or Mesos) are
explicitly not supported. Standalone session clusters are not supported either.
The application deployment is limited to single job applications.
The only supported deployment options are [Standalone in Application
Mode]({{< ref "docs/deployment/resource-providers/standalone/overview"
>}}#application-mode) ([described](#getting-started) on this page), [Docker in
Application Mode]({{< ref
"docs/deployment/resource-providers/standalone/docker"
>}}#application-mode-on-docker) and [Standalone Kubernetes Application
Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes"
>}}#deploy-application-cluster).
-- **Streaming jobs only**: The first version of Reactive Mode runs with
streaming jobs only. When submitting a batch job, then the default scheduler
will be used.
-- **No support for [local recovery]({{< ref
"docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery
is a feature that schedules tasks to machines so that the state on that machine
gets re-used if possible. The lack of this feature means that Reactive Mode
will always need to download the entire state from the checkpoint storage.
-- **No support for local failover**: Local failover means that the scheduler
is able to restart parts ("regions" in Flink's internals) of a failed job,
instead of the entire job. This limitation impacts only recovery time of
embarrassingly parallel jobs: Flink's default scheduler can restart failed
parts, while Reactive Mode will restart the entire job.
-- **Limited integration with Flink's Web UI**: Reactive Mode allows that a
job's parallelism can change over its lifetime. The web UI only shows the
current parallelism the job.
-- **Limited Job metrics**: With the exception of `numRestarts` all
[availability]({{< ref "docs/ops/metrics" >}}#availability) and
[checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the
`Job` scope are not working correctly.
+The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive
Mode.
+
+
+## Adaptive Scheduler
+
+{{< hint danger >}}
+Using Adaptive Scheduler directly (not through Reactive Mode) is only advised
for advanced users.
+{{< /hint >}}
+
+Adaptive Scheduler is a scheduler that can adjust the parallelism of a job
based on the available slots. On start up, it requests the number of slots
needed based on the parallelisms configured by the user in the streaming job.
If the number of slots offered is lower than the requested slots, Adaptive
Scheduler will reduce the parallelism so that it can start executing the job
(or fail if insufficient slots are available). In Reactive Mode (see above) the
parallelism requested is conceptually set to infinity, letting the job always
use as many resources as possible. You can also use Adaptive Scheduler without
Reactive Mode, but there are some practical limitations:
Review comment:
```suggestion
The Adaptive Scheduler can adjust the parallelism of a job based on
available slots. It will automatically reduce the parallelism if not enough
slots are available to run the job with the originally configured parallelism;
be it due to not enough resources being available at the time of submission, or
TaskManager outages during the job execution. If new slots become available the
job will be scaled up again, up to the configured parallelism.
In Reactive Mode (see above) the configured parallelism is ignored and
treated as if it was set to infinity, letting the job always use as many
resources as possible.
You can also use Adaptive Scheduler without Reactive Mode, but there are
some practical limitations:
```
I think this goes too far into implementation details of the declarative
slot protocol.
##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -100,13 +110,40 @@ Since Reactive Mode is a new, experimental feature, not
all features supported b
- **Deployment is only supported as a standalone application deployment**.
Active resource providers (such as native Kubernetes, YARN or Mesos) are
explicitly not supported. Standalone session clusters are not supported either.
The application deployment is limited to single job applications.
The only supported deployment options are [Standalone in Application
Mode]({{< ref "docs/deployment/resource-providers/standalone/overview"
>}}#application-mode) ([described](#getting-started) on this page), [Docker in
Application Mode]({{< ref
"docs/deployment/resource-providers/standalone/docker"
>}}#application-mode-on-docker) and [Standalone Kubernetes Application
Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes"
>}}#deploy-application-cluster).
-- **Streaming jobs only**: The first version of Reactive Mode runs with
streaming jobs only. When submitting a batch job, then the default scheduler
will be used.
-- **No support for [local recovery]({{< ref
"docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery
is a feature that schedules tasks to machines so that the state on that machine
gets re-used if possible. The lack of this feature means that Reactive Mode
will always need to download the entire state from the checkpoint storage.
-- **No support for local failover**: Local failover means that the scheduler
is able to restart parts ("regions" in Flink's internals) of a failed job,
instead of the entire job. This limitation impacts only recovery time of
embarrassingly parallel jobs: Flink's default scheduler can restart failed
parts, while Reactive Mode will restart the entire job.
-- **Limited integration with Flink's Web UI**: Reactive Mode allows that a
job's parallelism can change over its lifetime. The web UI only shows the
current parallelism the job.
-- **Limited Job metrics**: With the exception of `numRestarts` all
[availability]({{< ref "docs/ops/metrics" >}}#availability) and
[checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the
`Job` scope are not working correctly.
+The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive
Mode.
+
+
+## Adaptive Scheduler
+
+{{< hint danger >}}
+Using Adaptive Scheduler directly (not through Reactive Mode) is only advised
for advanced users.
+{{< /hint >}}
+
+Adaptive Scheduler is a scheduler that can adjust the parallelism of a job
based on the available slots. On start up, it requests the number of slots
needed based on the parallelisms configured by the user in the streaming job.
If the number of slots offered is lower than the requested slots, Adaptive
Scheduler will reduce the parallelism so that it can start executing the job
(or fail if insufficient slots are available). In Reactive Mode (see above) the
parallelism requested is conceptually set to infinity, letting the job always
use as many resources as possible. You can also use Adaptive Scheduler without
Reactive Mode, but there are some practical limitations:
+- If you are using Adaptive Scheduler on a session cluster, there are no
guarantees regarding the distribution of slots between multiple running jobs in
the same session.
+- An active resource manager (native Kubernetes, YARN, Mesos) will request
TaskManagers until the parallelism requested by the job is fulfilled,
potentially allocating a lot of resources.
Review comment:
How is this a limitation of the adaptive scheduler? The default
scheduler has the same behavior.
##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -100,13 +110,40 @@ Since Reactive Mode is a new, experimental feature, not
all features supported b
- **Deployment is only supported as a standalone application deployment**.
Active resource providers (such as native Kubernetes, YARN or Mesos) are
explicitly not supported. Standalone session clusters are not supported either.
The application deployment is limited to single job applications.
The only supported deployment options are [Standalone in Application
Mode]({{< ref "docs/deployment/resource-providers/standalone/overview"
>}}#application-mode) ([described](#getting-started) on this page), [Docker in
Application Mode]({{< ref
"docs/deployment/resource-providers/standalone/docker"
>}}#application-mode-on-docker) and [Standalone Kubernetes Application
Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes"
>}}#deploy-application-cluster).
-- **Streaming jobs only**: The first version of Reactive Mode runs with
streaming jobs only. When submitting a batch job, then the default scheduler
will be used.
-- **No support for [local recovery]({{< ref
"docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery
is a feature that schedules tasks to machines so that the state on that machine
gets re-used if possible. The lack of this feature means that Reactive Mode
will always need to download the entire state from the checkpoint storage.
-- **No support for local failover**: Local failover means that the scheduler
is able to restart parts ("regions" in Flink's internals) of a failed job,
instead of the entire job. This limitation impacts only recovery time of
embarrassingly parallel jobs: Flink's default scheduler can restart failed
parts, while Reactive Mode will restart the entire job.
-- **Limited integration with Flink's Web UI**: Reactive Mode allows that a
job's parallelism can change over its lifetime. The web UI only shows the
current parallelism the job.
-- **Limited Job metrics**: With the exception of `numRestarts` all
[availability]({{< ref "docs/ops/metrics" >}}#availability) and
[checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the
`Job` scope are not working correctly.
+The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive
Mode.
+
+
+## Adaptive Scheduler
+
+{{< hint danger >}}
+Using Adaptive Scheduler directly (not through Reactive Mode) is only advised
for advanced users.
+{{< /hint >}}
+
+Adaptive Scheduler is a scheduler that can adjust the parallelism of a job
based on the available slots. On start up, it requests the number of slots
needed based on the parallelisms configured by the user in the streaming job.
If the number of slots offered is lower than the requested slots, Adaptive
Scheduler will reduce the parallelism so that it can start executing the job
(or fail if insufficient slots are available). In Reactive Mode (see above) the
parallelism requested is conceptually set to infinity, letting the job always
use as many resources as possible. You can also use Adaptive Scheduler without
Reactive Mode, but there are some practical limitations:
+- If you are using Adaptive Scheduler on a session cluster, there are no
guarantees regarding the distribution of slots between multiple running jobs in
the same session.
+- An active resource manager (native Kubernetes, YARN, Mesos) will request
TaskManagers until the parallelism requested by the job is fulfilled,
potentially allocating a lot of resources.
+One benefit of Adpative Scheduler over the default scheduler is that it can
handle TaskManager losses gracefully, since it would just scale down in these
cases.
+
+### Usage
+
+The following configuration parameters need to be set:
+
+- `jobmanager.scheduler: adaptive`: Change from the default scheduler to
adaptive scheduler
+- `cluster.declarative-resource-management.enabled` Declarative resource
management must be enabled (enabled by default).
+
+Depending on your usage scenario, we also recommend adjusting the parallelism
of the job you are submitting to the adaptive scheduler. The parallelism
configured determines the number of slots Adaptive Scheduler will request.
Review comment:
In what circumstances would a 1.12 user that wants to use the adaptive
scheduler adjust the parallelism, and why?
##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -88,10 +88,20 @@ If you manually set a parallelism in your job for
individual operators or the en
Note that such a high maxParallelism might affect performance of the job,
since more internal structures are needed to maintain [some internal
structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html)
of Flink.
+When enabling Reactive Mode, the
`jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will
default to `-1`. This means that the JobManager will run forever waiting for
sufficient resources.
+If you want the JobManager to stop after a certain time without enough
TaskManagers to run the job, configure
`jobmanager.adaptive-scheduler.resource-wait-timeout`.
+
+With Reactive Mode enabled, the
`jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration
key will default to `0`: Flink will start runnning the job, as soon as there
are sufficient resources available.
+In scenarios where TaskManagers are not connecting at the same time, but
slowly one after another, this behavior leads to a job restart whenever a
TaskManager connects. Increase this configuration value if you want to wait for
the resources to stabilize before scheduling the job.
+
#### Recommendations
- **Configure periodic checkpointing for stateful jobs**: Reactive mode
restores from the latest completed checkpoint on a rescale event. If no
periodic checkpointing is enabled, your program will loose its state.
Checkpointing also configures a **restart strategy**. Reactive mode will
respect the configured restarting strategy: If no restarting strategy is
configured, reactive mode will fail your job, instead of scaling it.
+- Downscaling in Reactive Mode might cause longer stalls in your processing
because Flink waits for the heartbeat between JobManager and the stopped
TaskManager(s) to time-out. You will see that your Flink job is stuck in the
failing state for roughly 50 seconds before redeploying your job with a lower
parallelism.
+
+ The default timeout is configured to 50 seconds. Adjust the
[`heartbeat.timeout`]({{< ref "docs/deployment/config">}}#heartbeat-timeout)
configuration to a lower value, if your infrastructure permits this. Setting a
low heartbeat timeout can lead to failures if a TaskManager fails to respond to
a heartbeat, for example due to a network congestion or a long garbage
collection pause. Note that the [`heartbeat.interval`]({{< ref
"docs/deployment/config">}}#heartbeat-interval) always needs to be lower than
the timeout.
Review comment:
```suggestion
The default timeout is configured to 50 seconds. Adjust the
[`heartbeat.timeout`]({{< ref "docs/deployment/config">}}#heartbeat-timeout)
configuration to a lower value, if your infrastructure permits this. Setting a
low heartbeat timeout can lead to failures if a TaskManager fails to respond to
a heartbeat, for example due to a network congestion or a long garbage
collection pause. Note that the [`heartbeat.interval`]({{< ref
"docs/deployment/config">}}#heartbeat-interval) always needs to be lower than
the timeout.
```
Or is this intentional?
##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -88,10 +88,20 @@ If you manually set a parallelism in your job for
individual operators or the en
Note that such a high maxParallelism might affect performance of the job,
since more internal structures are needed to maintain [some internal
structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html)
of Flink.
+When enabling Reactive Mode, the
`jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will
default to `-1`. This means that the JobManager will run forever waiting for
sufficient resources.
+If you want the JobManager to stop after a certain time without enough
TaskManagers to run the job, configure
`jobmanager.adaptive-scheduler.resource-wait-timeout`.
+
+With Reactive Mode enabled, the
`jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration
key will default to `0`: Flink will start runnning the job, as soon as there
are sufficient resources available.
+In scenarios where TaskManagers are not connecting at the same time, but
slowly one after another, this behavior leads to a job restart whenever a
TaskManager connects. Increase this configuration value if you want to wait for
the resources to stabilize before scheduling the job.
+
#### Recommendations
- **Configure periodic checkpointing for stateful jobs**: Reactive mode
restores from the latest completed checkpoint on a rescale event. If no
periodic checkpointing is enabled, your program will loose its state.
Checkpointing also configures a **restart strategy**. Reactive mode will
respect the configured restarting strategy: If no restarting strategy is
configured, reactive mode will fail your job, instead of scaling it.
+- Downscaling in Reactive Mode might cause longer stalls in your processing
because Flink waits for the heartbeat between JobManager and the stopped
TaskManager(s) to time-out. You will see that your Flink job is stuck in the
failing state for roughly 50 seconds before redeploying your job with a lower
parallelism.
Review comment:
```suggestion
- Downscaling in Reactive Mode might cause longer stalls in your processing
because Flink waits for the heartbeat between JobManager and the stopped
TaskManager(s) to time out. You will see that your Flink job is stuck in the
failing state for roughly 50 seconds before redeploying your job with a lower
parallelism.
```
Is `stuck in the failing state` referring to the actual job status in Flink?
(asking because the job will never officially reach a FAILING state, because
from there the only path is FINISHED. It will be exposed as RUNNING until the
restart is triggered).
##########
File path: docs/content.zh/docs/deployment/elastic_scaling.md
##########
@@ -88,10 +88,20 @@ If you manually set a parallelism in your job for
individual operators or the en
Note that such a high maxParallelism might affect performance of the job,
since more internal structures are needed to maintain [some internal
structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html)
of Flink.
+When enabling Reactive Mode, the
`jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will
default to `-1`. This means that the JobManager will run forever waiting for
sufficient resources.
+If you want the JobManager to stop after a certain time without enough
TaskManagers to run the job, configure
`jobmanager.adaptive-scheduler.resource-wait-timeout`.
+
+With Reactive Mode enabled, the
`jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration
key will default to `0`: Flink will start runnning the job, as soon as there
are sufficient resources available.
+In scenarios where TaskManagers are not connecting at the same time, but
slowly one after another, this behavior leads to a job restart whenever a
TaskManager connects. Increase this configuration value if you want to wait for
the resources to stabilize before scheduling the job.
Review comment:
Unrelated to this ticket, but these both sound like undesirable
behaviors.
If we're fine with downscaling causing 30+seconds of downtime, then why can
we not wait a few seconds for the stabilization?
Why is reactive mode now this special case where a cluster may sit idling
indefinitely?
It seems a bit like we're optimizing the defaults for demos, and not actual
production usage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]