[ 
https://issues.apache.org/jira/browse/FLINK-30938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691244#comment-17691244
 ] 

Yufan Sheng edited comment on FLINK-30938 at 2/20/23 4:59 PM:
--------------------------------------------------------------

Thanks for assigning this test task to me. I have finished the verification by 
following the instruction from [~Weijie Guo]. The {{AdaptiveBatchScheduler}} 
and {{SpeculativeExecution}} are working as expert. Here is the detailed test 
process and report from my side.

h2. Building the flink on {{release-1.17}} branch

I build the flink locally on {{release-1.17}} branch. The latest commit in my 
branch is {{6b4745}}. To get the build more faster. I skipped all the code 
quality check and tests by using command {{mvn clean install -DskipTests 
-Pfast}}.

 !flink-1.17-branch-log.png|width=500! 

The running flink dashboard also proven this is a build from the {{6b4745}} 
commit.

 !flink-dashborad-version.png|width=550! 

We start the local standalone flink cluster for simplifying the test. The task 
manager is configured with 4 slots.

 !taskmanager-slots.png|width=700! 

h2. Test hybrid shuffle in {{AdaptiveBatchScheduler}}

The default config value for {{execution.batch-shuffle-mode}} is 
{{ALL_EXCHANGES_BLOCKING}}, which means that this is a blocking shuffle. To get 
the hybrid shuffle mode enabled. We change this value to 
{{ALL_EXCHANGES_HYBRID_SELECTIVE}}, which means the data could be consumed 
immediately when the downstream task is available. No need to persist the data 
to disk.

Given the AdaptiveBatchScheduler is enabled by default. The default 
{{jobmanager.partition.hybrid.partition-data-consume-constraint}} in this 
scheduler is {{UNFINISHED_PRODUCERS}}. This means that the produced data can be 
consumed immediately, no need to wait for the producer finished.

To get hybrid shuffle feature testable. The last thing we need to do is setting 
enough slots for a task manager. The default flink standalone cluster only has 
on job manager and one task manager. The test code for this verification is 
shown in attachments: [^testAdaptiveBatchJob]. It will start a source, a sink 
and two map functions. So we set the slot to 4 by setting 
{{taskmanager.numberOfTaskSlots: 4}} in {{flink.yml}}.

Finally, we submit the job [^testAdaptiveBatchJob] to flink cluster. 

h3. Hybrid shuffle in {{AdaptiveBatchScheduler}} report

Since all the map operator will sleep for 5 seconds. The first thing we can 
find in job graph is that the map operator and sink operator are running 
simultaneously. It this is a default block
 shuffle, the sink should start after all the map operators have been finished.

!AdaptiveBatchScheduler-job-graph.png|width=600!

The screenshot of the execution timeline also shows that the source, map and 
sink are almost running in the same time.

!AdaptiveBatchScheduler-timeline.png|width=600!

Finally, the log also confirm this.

{code}
2023-02-20 23:59:16,558 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: Sequence Source (1/1)#0 
(fb0eccc08c3516b1b8ed16977616b147_bc764cd8ddf7a0cff126f51c16239658_0_0) 
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,563 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Sink: Unnamed (1/1)#0 
(fb0eccc08c3516b1b8ed16977616b147_ea632d67b7d595e5b851708ae9ad79d6_0_0) 
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,564 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Map (2/2)#0 
(fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_1_0) 
switched from INITIALIZING to RUNNING.
2023-02-20 23:59:16,564 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Map (1/2)#0 
(fb0eccc08c3516b1b8ed16977616b147_0a448493b4782967b150582570326227_0_0) 
switched from INITIALIZING to RUNNING.
{code}



was (Author: syhily):
Thanks for assigning this test task to me. I have finished the verification by 
following the instruction from [~Weijie Guo]. The {{AdaptiveBatchScheduler}} 
and {{SpeculativeExecution}} are working as expert. Here is the detailed test 
process and report from my side.

h3. Building the flink on {{release-1.17}} branch

I build the flink locally on {{release-1.17}} branch. The latest commit in my 
branch is {{6b4745}}. To get the build more faster. I skipped all the code 
quality check and tests by using command {{mvn clean install -DskipTests 
-Pfast}}.

 !flink-1.17-branch-log.png|width=500! 

The running flink dashboard also proven this is a build from the {{6b4745}} 
commit.

 !flink-dashborad-version.png|width=550! 

We start the local standalone flink cluster for simplifying the test. The task 
manager is configured with 4 slots.

 !taskmanager-slots.png:width=700! 

h3. AdaptiveBatchScheduler verification

The default config value for {{execution.batch-shuffle-mode}} is 
{{ALL_EXCHANGES_BLOCKING}}, which means that this is a blocking shuffle. To get 
the hybrid shuffle mode enabled. We change this value to 
{{ALL_EXCHANGES_HYBRID_SELECTIVE}}, which means the data could be consumed 
immediately when the downstream task is available. No need to persist the data 
to disk.

Given the AdaptiveBatchScheduler is enabled by default. The default 
{{jobmanager.partition.hybrid.partition-data-consume-constraint}} in this 
scheduler is {{UNFINISHED_PRODUCERS}}. This means that the produced data can be 
consumed immediately, no need to wait for the producer finished.

To get this feature testable. The last thing we need to do is setting enough 
slots for a task manager. The default flink standalone cluster only has on job 
manager and one task manager. The test code for this verification is shown in 
attachments:  [^testAdaptiveBatchJob]. It will start a source, a sink and two 
map functions. So we set the slot to 4 by setting 
{{taskmanager.numberOfTaskSlots: 4}} in {{flink.yml}}.

Finally, we submit the job [^testAdaptiveBatchJob]  to flink cluster. 

h4. AdaptiveBatchScheduler report

> Release Testing: Verify FLINK-29766 Adaptive Batch Scheduler should also work 
> with hybrid shuffle mode
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30938
>                 URL: https://issues.apache.org/jira/browse/FLINK-30938
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.17.0
>            Reporter: Weijie Guo
>            Assignee: Yufan Sheng
>            Priority: Blocker
>              Labels: release-testing
>         Attachments: AdaptiveBatchScheduler-job-graph.png, 
> AdaptiveBatchScheduler-timeline.png, flink-1.17-branch-log.png, 
> flink-dashborad-version.png, taskmanager-slots.png, testAdaptiveBatchJob, 
> testSpeculativeExecution
>
>
> This ticket aims for verifying FLINK-29766: Adaptive Batch Scheduler should 
> also work with hybrid shuffle mode.
> More details about this feature and how to use it can be found in this 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle/#hybrid-shuffle].
> The verification is divided into two parts:
> Part I: Verify hybrid shuffle can work with AdaptiveBatchScheduler
> Write a simple Flink batch job using hybrid shuffle mode and submit this job. 
> Note that in flink-1.17, AdaptiveBatchScheduler is the default scheduler for 
> batch job, so you do not need other configuration.
> Suppose your job's topology like source -> map -> sink, if your cluster have 
> enough slots, you should find that source and map are running at the same 
> time.
> Part II: Verify hybrid shuffle can work with Speculative Execution
> Write a Flink batch job using hybrid shuffle mode which has a subtask running 
> much slower than others (e.g. sleep indefinitely if it runs on a certain 
> host, the hostname can be retrieved via 
> InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
> attemptNumer) % 2 == 0)
> Modify Flink configuration file to enable speculative execution and tune the 
> configuration as you like
> Submit the job. Checking the web UI, logs, metrics and produced result.
> You should find that once a producer task's one subtask finished, all its 
> consumer tasks can be scheduled in log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to