[
https://issues.apache.org/jira/browse/FLINK-22134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17319374#comment-17319374
]
Xintong Song edited comment on FLINK-22134 at 4/12/21, 11:22 AM:
-----------------------------------------------------------------
cc [~trohrmann], [~rmetzger], [~chesnay]
I've done my testing on this feature, and here are my findings.
Among the founded issues, the *bold* ones are IMO critical (not blocker since
this is a new feature and the issues does not affect existing use cases) and
would be nice to fix for 1.13.0.
h3. Behavior
# *There are warnings about metric name collision. I've checked and the
mentioned metrics are indeed no longer updated after rescaling.*
{code:java}
2021-04-12 17:54:07,402 WARN org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a Metric with the name
'totalNumberOfCheckpoints'. Metric will not be reported.[localhost, jobmanager,
CarTopSpeedWindowingExample]
{code}
# *On new resources appear, the job is canceled immediately, but is not
rescheduled until either the resource wait timeout or the resource
stabilization timeout is reached. That means the processing is interrupted for
unnecessarily long. It might make sense to cancel the job after the resource is
stabilized.*
h3. Documentation
# It's a bit unclear to me how is the wait resource timeout measured. Is it
the time since the first TM registration (after last rescale), or the time
since the last recent TM registration? I can see it's the former from observing
the behaviors, but not from the documentation.
# About parallelism being not configurable.
{quote}The parallelism of individual operators in a job will be determined by
the scheduler.
{quote}
The above description applies for both the operators (e.g.,
{{SingleOutputStreamOperator#setParallelism}}) and execution environments
(e.g., {{StreamExecutionEnvironment#setParallelism}}). While the former is
explicit, the latter is not that clear to me, especially when read together
with the following description, which makes me think there's a way to set a
minimum parallelism for not individual operators but the overall job.
{quote}If you want the JobManager to stop after a certain time without enough
TaskManagers to run the job, configure
jobmanager.adaptive-scheduler.resource-wait-timeout.
{quote}
h3. UI
# *There are places that parallelism of tasks are not updated as scaling
up/down.*
!截屏2021-04-12 上午10.30.25.png|width=800!
# Attempt of task is confusing. I'm not entirely sure whether it makes sense
to increase the attempt here, since the underlying subtask has changed as
rescaling.
!截屏2021-04-12 上午10.32.50.png|width=800!
# Is there anywhere that the user can see the history of rescales?
h3. Log readability
# There are warnings about not being able to fulfill resource requirements,
which could be confusing for the users.
{code:java}
2021-04-12 17:53:47,382 WARN
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Could not fulfill resource requirements of job
20b20457f256f68fa94327012f41589f.
{code}
h3. Other things I tested and looked good to me
# Setting max parallelism for individual operator, or for the entire execution
environment. The max parallelisms are always respected.
# Setting multiple slot sharing groups. Does not cancel the job to scale up
unless there are sufficient slots for increasing the parallelism of the entire
pipeline.
# Removing an idle TM does not affect the job.
# Does not rescale multiple times during the stabilization timeout.
# When resource wait and stabilization timeouts are both configured, the job
is scheduled when either one is reached.
was (Author: xintongsong):
cc [~trohrmann], [~rmetzger], [~chesnay]
I've done my testing on this feature, and here are my findings.
Among the founded issues, the *bold* ones are IMO critical (not blocker since
this is a new feature and the issues does not affect existing use cases) and
would be nice to fix by for 1.13.0.
h3. Behavior
# *There are warnings about metric name collision. I've checked and the
mentioned metrics are indeed no longer updated after rescaling.*
{code:java}
2021-04-12 17:54:07,402 WARN org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a Metric with the name
'totalNumberOfCheckpoints'. Metric will not be reported.[localhost, jobmanager,
CarTopSpeedWindowingExample]
{code}
# *On new resources appear, the job is canceled immediately, but is not
rescheduled until either the resource wait timeout or the resource
stabilization timeout is reached. That means the processing is interrupted for
unnecessarily long. It might make sense to cancel the job after the resource is
stabilized.*
h3. Documentation
# It's a bit unclear to me how is the wait resource timeout measured. Is it
the time since the first TM registration (after last rescale), or the time
since the last recent TM registration? I can see it's the former from observing
the behaviors, but not from the documentation.
# About parallelism being not configurable.
{quote}The parallelism of individual operators in a job will be determined by
the scheduler.
{quote}
The above description applies for both the operators (e.g.,
{{SingleOutputStreamOperator#setParallelism}}) and execution environments
(e.g., {{StreamExecutionEnvironment#setParallelism}}). While the former is
explicit, the latter is not as that clear, especially when read together with
the following description, which makes me think there's a way to set a minimum
parallelism for not individual operators but the overall job.
{quote}If you want the JobManager to stop after a certain time without enough
TaskManagers to run the job, configure
jobmanager.adaptive-scheduler.resource-wait-timeout.
{quote}
h3. UI
# *There are places that parallelism of tasks are not updated as scaling
up/down.*
!截屏2021-04-12 上午10.30.25.png|width=800!
# Attempt of task is confusing. I'm not entirely sure whether it makes sense
to increase the attempt here, since the underlying subtask has changed as
rescaling.
!截屏2021-04-12 上午10.32.50.png|width=800!
# Is there anywhere that the user can see the history of rescales?
h3. Log readability
# There are warnings about not being able to fulfill resource requirements,
which could be confusing for the users.
{code:java}
2021-04-12 17:53:47,382 WARN
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Could not fulfill resource requirements of job
20b20457f256f68fa94327012f41589f.
{code}
h3. Other things I tested and looked good to me
# Setting max parallelism for individual operator, or for the entire execution
environment. The max parallelisms are always respected.
# Setting multiple slot sharing groups. Does not cancel the job to scale up
unless there are sufficient slots for increasing the parallelism of the entire
pipeline.
# Removing an idle TM does not affect the job.
# Does not rescale multiple times during the stabilization timeout.
# When resource wait and stabilization timeouts are both configured, the job
is scheduled when either one is reached.
> Test the reactive mode
> ----------------------
>
> Key: FLINK-22134
> URL: https://issues.apache.org/jira/browse/FLINK-22134
> Project: Flink
> Issue Type: Test
> Components: Runtime / Coordination
> Affects Versions: 1.13.0
> Reporter: Till Rohrmann
> Assignee: Xintong Song
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.13.0
>
> Attachments: 截屏2021-04-12 上午10.30.25.png, 截屏2021-04-12 上午10.32.50.png
>
>
> The newly introduced reactive mode (FLINK-10407) allows Flink to make use of
> newly arriving resources while the job is running. The feature documentation
> with the current set of limitations can be found here [1].
> In order to test this new feature I recommend to follow the documentation and
> to try it out wrt the stated limitations. Everything which is not explicitly
> contained in the set of limitations should work.
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/
--
This message was sent by Atlassian Jira
(v8.3.4#803005)