[
https://issues.apache.org/jira/browse/FLINK-30938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691244#comment-17691244
]
Yufan Sheng edited comment on FLINK-30938 at 2/20/23 5:31 PM:
--------------------------------------------------------------
Thanks for assigning this test task to me. I have finished the verification by
following the instructions from [~Weijie Guo]. The {{AdaptiveBatchScheduler}}
and {{SpeculativeExecution}} are working as expert. Here are the detailed test
process and report from my side.
h2. Building the flink on {{release-1.17}} branch
I build the flink locally on {{release-1.17}} branch. The latest commit in my
local repository is {{6b4745}}. To get the build more faster. I skipped all the
code quality check and tests by using command {{mvn clean install -DskipTests
-Pfast}}.
!flink-1.17-branch-log.png|width=500!
The running flink dashboard also proven this is a build from the {{6b4745}}
commit.
!flink-dashborad-version.png|width=550!
We start the local standalone flink cluster for simplifying the test. The task
manager is configured with 4 slots.
!taskmanager-slots.png|width=700!
h2. Test hybrid shuffle with {{AdaptiveBatchScheduler}}
The default config value for {{execution.batch-shuffle-mode}} is
{{ALL_EXCHANGES_BLOCKING}}, which means that this is a blocking shuffle. To get
the hybrid shuffle mode enabled. We change this value to
{{ALL_EXCHANGES_HYBRID_SELECTIVE}}, which means the data could be consumed
immediately when the downstream task is available. No need to persist the data
to disk.
Given the AdaptiveBatchScheduler is enabled by default. The default
{{jobmanager.partition.hybrid.partition-data-consume-constraint}} in this
scheduler is {{UNFINISHED_PRODUCERS}}. This means that the produced data can be
consumed immediately, no need to wait for the producer finished.
To get hybrid shuffle feature testable. The last thing we need to do is setting
enough slots for a task manager. The default flink standalone cluster only has
on job manager and one task manager. The test code for this verification is
shown in attachments: [^testAdaptiveBatchJob]. It will start a source, a sink
and two map functions. So we set the slot to 4 by setting
{{taskmanager.numberOfTaskSlots: 4}} in {{flink.yml}}.
Finally, we submit the job [^testAdaptiveBatchJob] to flink cluster.
h3. Hybrid shuffle in {{AdaptiveBatchScheduler}} report
Since all the map operator will sleep for 5 seconds. The first thing we can
find in job graph is that the map operator and sink operator are running
simultaneously. It this is a default block
shuffle, the sink should start after all the map operators have been finished.
!AdaptiveBatchScheduler-job-graph.png|width=600!
The screenshot of the execution timeline also shows that the source, map and
sink are almost running in the same time.
!AdaptiveBatchScheduler-timeline.png|width=600!
Finally, the log also confirm this.
{code}
2023-02-20 23:59:16,558 INFO org.apache.flink.runtime.taskmanager.Task
[] - Source: Sequence Source (1/1)#0
(fb0eccc08c3516b1b8ed16977616b147_bc764cd8ddf7a0cff126f51c16239658_0_0)
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,563 INFO org.apache.flink.runtime.taskmanager.Task
[] - Sink: Unnamed (1/1)#0
(fb0eccc08c3516b1b8ed16977616b147_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,564 INFO org.apache.flink.runtime.taskmanager.Task
[] - Map (2/2)#0
(fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_1_0)
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,564 INFO org.apache.flink.runtime.taskmanager.Task
[] - Map (1/2)#0
(fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_0_0)
switched from INITIALIZING to RUNNING.
{code}
h2. Test hybird shuffle in Speculative Execution
To get the *Speculative Execution* enabled, we add the
{{execution.batch.speculative.enabled: true}} to {{flink.yml}}. We also change
the {{execution.batch-shuffle-mode}} to {{ALL_EXCHANGES_HYBRID_FULL}} for
persisting all the datas.
The test code [^testSpeculativeExecution] in this verification is sleep forever
when the index of sub task plus the number of attempt is even. To get the
speculative execution detects the slower tasks. We also add a baseline for
detecting this hanged tasks more faster.
{code}
slow-task-detector.execution-time.baseline-ratio: 0.2
slow-task-detector.execution-time.baseline-lower-bound: 0
slow-task-detector.execution-time.baseline-multiplier: 1
execution.batch.speculative.block-slow-node-duration: 0
{code}
Finally, we restart the flink cluster with new configurations and submit the
[^testSpeculativeExecution] job to flink cluster.
h3. Hybrid shuffle in Speculative Execution report
The default value of data consumption constraints in speculative execution is
{{ONLY_FINISHED_PRODUCERS}}. This means that if a upstream producer has been
finished. We will start a down stream consumer for consuming the data produced
by upstream. The screenshot of execution timeline confirms this.
!Speculative-Execution-timeline.png|width=600!
We can find that once the only once source task has been finished, we start two
map tasks simultaneously. The first one will finished soon. And the second one
will hang forever. So the downstream sink task started on the half way of the
map running time. Because the first map task has been finished.
{code:java}
2023-02-21 00:11:37,993 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
Sequence Source (1/1)
(ee83d53171224378cb74bdd5681a1399_bc764cd8ddf7a0cff126f51c16239658_0_0)
switched from RUNNING to FINISHED.
2023-02-21 00:11:37,996 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Clearing resource requirements of job c9562d7c3c921ac7d2f0383c7e250827
2023-02-21 00:11:38,000 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,001 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,001 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,001 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map
(1/2) (attempt #0) with attempt id
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0 and
vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @
localhost (dataPort=38645) with allocation id 772ba5994150e188cf2ed4743eafb537
2023-02-21 00:11:38,005 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0)
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,006 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,068 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,074 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0)
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,074 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map
(2/2) (attempt #0) with attempt id
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0 and
vertex id 0a448493b4782967b150582570326227_1 to localhost:41185-563559 @
localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557
2023-02-21 00:11:38,104 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0)
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,139 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,140 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0)
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,167 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0)
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,168 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,168 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,169 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,169 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,169 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
Sink: Unnamed (1/1) (attempt #0) with attempt id
ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0 and
vertex id ea632d67b7d595e5b851708ae9ad79d6_0 to localhost:41185-563559 @
localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557
2023-02-21 00:11:38,187 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,200 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,335 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Map (1/2) (0a448493b4782967b150582570326227_0) is detected as a
slow vertex, create and deploy 1 new speculative executions for it.
2023-02-21 00:11:38,339 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1)
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,340 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=3}]
2023-02-21 00:11:38,415 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1)
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,415 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map
(1/2) (attempt #1) with attempt id
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1 and
vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @
localhost (dataPort=38645) with allocation id c053b04b2b8a67a771a10bb4339cbc2b
2023-02-21 00:11:38,435 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1)
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,443 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1)
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,463 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1)
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,466 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,474 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Canceling 1 un-finished executions of
0a448493b4782967b150582570326227_0 because one of its executions has finished.
2023-02-21 00:11:38,475 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from RUNNING to CANCELING.
2023-02-21 00:11:38,492 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,493 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,499 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from CANCELING to CANCELED.
2023-02-21 00:11:38,500 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Test
Speculative Execution Job (c9562d7c3c921ac7d2f0383c7e250827) switched from
state RUNNING to FINISHED.
{code}
The execution log shows a detailed view of the speculative execution. The
{{Deploying Map (1/2)}} has shown twice because its task index is 0. It will
sleep forever in the first execution. And when we redeployed this map task, the
attempt number is 1. This make the sum of task index and attempt number is 1,
which isn't a even number. So the map task can bypass the sleep logic and
finished as expect. Because the {{full spilling strategy}} we enabled before,
only the map task will be restarted.
was (Author: syhily):
Thanks for assigning this test task to me. I have finished the verification by
following the instructions from [~Weijie Guo]. The {{AdaptiveBatchScheduler}}
and {{SpeculativeExecution}} are working as expert. Here are the detailed test
process and report from my side.
h2. Building the flink on {{release-1.17}} branch
I build the flink locally on {{release-1.17}} branch. The latest commit in my
local repository is {{6b4745}}. To get the build more faster. I skipped all the
code quality check and tests by using command {{mvn clean install -DskipTests
-Pfast}}.
!flink-1.17-branch-log.png|width=500!
The running flink dashboard also proven this is a build from the {{6b4745}}
commit.
!flink-dashborad-version.png|width=550!
We start the local standalone flink cluster for simplifying the test. The task
manager is configured with 4 slots.
!taskmanager-slots.png|width=700!
h2. Test hybrid shuffle with {{AdaptiveBatchScheduler}}
The default config value for {{execution.batch-shuffle-mode}} is
{{ALL_EXCHANGES_BLOCKING}}, which means that this is a blocking shuffle. To get
the hybrid shuffle mode enabled. We change this value to
{{ALL_EXCHANGES_HYBRID_SELECTIVE}}, which means the data could be consumed
immediately when the downstream task is available. No need to persist the data
to disk.
Given the AdaptiveBatchScheduler is enabled by default. The default
{{jobmanager.partition.hybrid.partition-data-consume-constraint}} in this
scheduler is {{UNFINISHED_PRODUCERS}}. This means that the produced data can be
consumed immediately, no need to wait for the producer finished.
To get hybrid shuffle feature testable. The last thing we need to do is setting
enough slots for a task manager. The default flink standalone cluster only has
on job manager and one task manager. The test code for this verification is
shown in attachments: [^testAdaptiveBatchJob]. It will start a source, a sink
and two map functions. So we set the slot to 4 by setting
{{taskmanager.numberOfTaskSlots: 4}} in {{flink.yml}}.
Finally, we submit the job [^testAdaptiveBatchJob] to flink cluster.
h3. Hybrid shuffle in {{AdaptiveBatchScheduler}} report
Since all the map operator will sleep for 5 seconds. The first thing we can
find in job graph is that the map operator and sink operator are running
simultaneously. It this is a default block
shuffle, the sink should start after all the map operators have been finished.
!AdaptiveBatchScheduler-job-graph.png|width=600!
The screenshot of the execution timeline also shows that the source, map and
sink are almost running in the same time.
!AdaptiveBatchScheduler-timeline.png|width=600!
Finally, the log also confirm this.
{code}
2023-02-20 23:59:16,558 INFO org.apache.flink.runtime.taskmanager.Task
[] - Source: Sequence Source (1/1)#0
(fb0eccc08c3516b1b8ed16977616b147_bc764cd8ddf7a0cff126f51c16239658_0_0)
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,563 INFO org.apache.flink.runtime.taskmanager.Task
[] - Sink: Unnamed (1/1)#0
(fb0eccc08c3516b1b8ed16977616b147_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,564 INFO org.apache.flink.runtime.taskmanager.Task
[] - Map (2/2)#0
(fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_1_0)
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,564 INFO org.apache.flink.runtime.taskmanager.Task
[] - Map (1/2)#0
(fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_0_0)
switched from INITIALIZING to RUNNING.
{code}
h2. Test hybird shuffle in Speculative Execution
To get the *Speculative Execution* enabled, we add the
{{execution.batch.speculative.enabled: true}} to {{flink.yml}}. We also change
the {{execution.batch-shuffle-mode}} to {{ALL_EXCHANGES_HYBRID_FULL}} for
persisting all the datas.
The test code [^testSpeculativeExecution] in this verification is sleep forever
when the index of sub task plus the number of attempt is even. To get the
speculative execution detects the slower tasks. We also add a baseline for
detecting this hanged tasks more faster.
{code}
slow-task-detector.execution-time.baseline-ratio: 0.2
slow-task-detector.execution-time.baseline-lower-bound: 0
slow-task-detector.execution-time.baseline-multiplier: 1
execution.batch.speculative.block-slow-node-duration: 0
{code}
Finally, we restart the flink cluster with new configurations and submit the
[^testSpeculativeExecution] job to flink cluster.
h3. Hybrid shuffle in Speculative Execution report
The default value of data consumption constraints in speculative execution is
{{ONLY_FINISHED_PRODUCERS}}. This means that if a upstream producer has been
finished. We will start a down stream consumer for consuming the data produced
by upstream. The screenshot of execution timeline confirms this.
!Speculative-Execution-timeline.png||width=600!
We can find that once the only once source task has been finished, we start two
map tasks simultaneously. The first one will finished soon. And the second one
will hang forever. So the downstream sink task started on the half way of the
map running time. Because the first map task has been finished.
{code:java}
2023-02-21 00:11:37,993 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
Sequence Source (1/1)
(ee83d53171224378cb74bdd5681a1399_bc764cd8ddf7a0cff126f51c16239658_0_0)
switched from RUNNING to FINISHED.
2023-02-21 00:11:37,996 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Clearing resource requirements of job c9562d7c3c921ac7d2f0383c7e250827
2023-02-21 00:11:38,000 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,001 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,001 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,001 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map
(1/2) (attempt #0) with attempt id
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0 and
vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @
localhost (dataPort=38645) with allocation id 772ba5994150e188cf2ed4743eafb537
2023-02-21 00:11:38,005 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0)
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,006 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,068 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,074 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0)
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,074 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map
(2/2) (attempt #0) with attempt id
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0 and
vertex id 0a448493b4782967b150582570326227_1 to localhost:41185-563559 @
localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557
2023-02-21 00:11:38,104 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0)
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,139 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,140 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0)
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,167 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (2/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_1_0)
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,168 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,168 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,169 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,169 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,169 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
Sink: Unnamed (1/1) (attempt #0) with attempt id
ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0 and
vertex id ea632d67b7d595e5b851708ae9ad79d6_0 to localhost:41185-563559 @
localhost (dataPort=38645) with allocation id c3cc3ffeb221d7cd327722feaa5d5557
2023-02-21 00:11:38,187 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,200 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,335 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Map (1/2) (0a448493b4782967b150582570326227_0) is detected as a
slow vertex, create and deploy 1 new speculative executions for it.
2023-02-21 00:11:38,339 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1)
switched from CREATED to SCHEDULED.
2023-02-21 00:11:38,340 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=3}]
2023-02-21 00:11:38,415 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1)
switched from SCHEDULED to DEPLOYING.
2023-02-21 00:11:38,415 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Map
(1/2) (attempt #1) with attempt id
ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1 and
vertex id 0a448493b4782967b150582570326227_0 to localhost:41185-563559 @
localhost (dataPort=38645) with allocation id c053b04b2b8a67a771a10bb4339cbc2b
2023-02-21 00:11:38,435 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1)
switched from DEPLOYING to INITIALIZING.
2023-02-21 00:11:38,443 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1)
switched from INITIALIZING to RUNNING.
2023-02-21 00:11:38,463 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_1)
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,466 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=2}]
2023-02-21 00:11:38,474 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Canceling 1 un-finished executions of
0a448493b4782967b150582570326227_0 because one of its executions has finished.
2023-02-21 00:11:38,475 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from RUNNING to CANCELING.
2023-02-21 00:11:38,492 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed
(1/1) (ee83d53171224378cb74bdd5681a1399_ea632d67b7d595e5b851708ae9ad79d6_0_0)
switched from RUNNING to FINISHED.
2023-02-21 00:11:38,493 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Received resource requirements from job c9562d7c3c921ac7d2f0383c7e250827:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=1}]
2023-02-21 00:11:38,499 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map (1/2)
(ee83d53171224378cb74bdd5681a1399_0a448493b4782967b150582570326227_0_0)
switched from CANCELING to CANCELED.
2023-02-21 00:11:38,500 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Test
Speculative Execution Job (c9562d7c3c921ac7d2f0383c7e250827) switched from
state RUNNING to FINISHED.
{code}
The execution log shows a detailed view of the speculative execution. The
{{Deploying Map (1/2)}} has shown twice because its task index is 0. It will
sleep forever in the first execution. And when we redeployed this map task, the
attempt number is 1. This make the sum of task index and attempt number is 1,
which isn't a even number. So the map task can bypass the sleep logic and
finished as expect. Because the {{full spilling strategy}} we enabled before,
only the map task will be restarted.
> Release Testing: Verify FLINK-29766 Adaptive Batch Scheduler should also work
> with hybrid shuffle mode
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-30938
> URL: https://issues.apache.org/jira/browse/FLINK-30938
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Affects Versions: 1.17.0
> Reporter: Weijie Guo
> Assignee: Yufan Sheng
> Priority: Blocker
> Labels: release-testing
> Attachments: AdaptiveBatchScheduler-job-graph.png,
> AdaptiveBatchScheduler-timeline.png, Speculative-Execution-timeline.png,
> flink-1.17-branch-log.png, flink-dashborad-version.png,
> taskmanager-slots.png, testAdaptiveBatchJob, testSpeculativeExecution
>
>
> This ticket aims for verifying FLINK-29766: Adaptive Batch Scheduler should
> also work with hybrid shuffle mode.
> More details about this feature and how to use it can be found in this
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle/#hybrid-shuffle].
> The verification is divided into two parts:
> Part I: Verify hybrid shuffle can work with AdaptiveBatchScheduler
> Write a simple Flink batch job using hybrid shuffle mode and submit this job.
> Note that in flink-1.17, AdaptiveBatchScheduler is the default scheduler for
> batch job, so you do not need other configuration.
> Suppose your job's topology like source -> map -> sink, if your cluster have
> enough slots, you should find that source and map are running at the same
> time.
> Part II: Verify hybrid shuffle can work with Speculative Execution
> Write a Flink batch job using hybrid shuffle mode which has a subtask running
> much slower than others (e.g. sleep indefinitely if it runs on a certain
> host, the hostname can be retrieved via
> InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex +
> attemptNumer) % 2 == 0)
> Modify Flink configuration file to enable speculative execution and tune the
> configuration as you like
> Submit the job. Checking the web UI, logs, metrics and produced result.
> You should find that once a producer task's one subtask finished, all its
> consumer tasks can be scheduled in log.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)