[ https://issues.apache.org/jira/browse/FLINK-15013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986656#comment-16986656 ]
Zhu Zhu commented on FLINK-15013: --------------------------------- Thanks for [~aljoscha] to find out the faulty commit. The root cause should be this change in the faulty commit: - int bestCandidateScore = Integer.MIN_VALUE; + double bestCandidateScore = Double.MIN_VALUE; Since Integer.MIN_VALUE is negative but Double.MIN_VALUE is a very small positive value. So that slot with core 0(Locality==NON_LOCAL) will fail to match the slot request. *change it to {{double bestCandidateScore = -1.0;}} should fix this issue.* However, here comes another question: the {{NON_LOCAL}} locality should not happen in this case. Theoretically it should be able to find a slot of its upstream node which is {{LOCAL}}, since the task is scheduled when its upstream tasks are assigned slots, its parallelism is not larger, and all tasks are in the same slot sharing group. I did some experiment and find the cause, it's a bit complicated to explain. 1. To be simple, the root cause is that a slot offer for a shared slot completes the location of inner tasks before making the shared slot resolved with location. (due to unexpected {{CompletableFuture}} callback) 2. The task location completion triggers its downstream task scheduling, but the downstream task would fail to see resolved shared slot of its upstream task due to #1. So it make pick a random pending slot sharing group which is against the input locality constraint. (see {{SchedulerImpl#allocateMultiTaskSlot}}) 3. After #2 happened. A later scheduled task may find there is no pending slot sharing group, and the resolved slot sharing group cannot fulfill its locality constraint. Then it will request a new slot. Looks to me the most simple way to fix it would be: Resolve shared slot locations when it's assigned a physical slot, before completing the location futures of tasks. So that the tasks would always be possible to find a resolved slot. Not that this may not be a new issue since the problematic logics has been there for versions. Fixing it is not necessary to resolve this ticket, but can make the input locality constraint work as expected. > Flink (on YARN) sometimes needs too many slots > ---------------------------------------------- > > Key: FLINK-15013 > URL: https://issues.apache.org/jira/browse/FLINK-15013 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.10.0 > Reporter: Aljoscha Krettek > Priority: Blocker > Fix For: 1.10.0 > > Attachments: DualInputWordCount.jar > > > *THIS IS DIFFERENT FROM FLINK-15007, even though the text looks almost the > same.* > This was discovered while debugging FLINK-14834. In some cases a Flink needs > needs more slots to execute than expected. You can see this in some of the > logs attached to FLINK-14834. > You can reproduce this using > [https://github.com/aljoscha/docker-hadoop-cluster] to bring up a YARN > cluster and then running a compiled Flink in there. > When you run > {code:java} > bin/flink run -m yarn-cluster -p 3 -yjm 1224 -ytm 1224 > /root/DualInputWordCount.jar --input hdfs:///wc-in-1 --output hdfs:///wc-out > && hdfs dfs -rm -r /wc-out > {code} > and check the logs afterwards you will sometimes see three "Requesting new > slot..." statements and sometimes you will see four. > This is the {{git bisect}} log that identifies the first faulty commit > ([https://github.com/apache/flink/commit/2ab8b61f2f22f1a1ce7f92cd6b8dd32d2c0c227d|https://github.com/apache/flink/commit/2ab8b61f2f22f1a1ce7f92cd6b8dd32d2c0c227d]): > {code:java} > git bisect start > # good: [09f2f43a1d73c76bf4d3f4a1205269eb860deb14] [FLINK-14154][ml] Add the > class for multivariate Gaussian Distribution. > git bisect good 09f2f43a1d73c76bf4d3f4a1205269eb860deb14 > # bad: [01d6972ab267807b8afccb09a45c454fa76d6c4b] [hotfix] Refactor out slots > creation from the TaskSlotTable constructor > git bisect bad 01d6972ab267807b8afccb09a45c454fa76d6c4b > # bad: [7a61c582c7213f123e10de4fd11a13d96425fd77] [hotfix] Fix wrong Java doc > comment of BroadcastStateBootstrapFunction.Context > git bisect bad 7a61c582c7213f123e10de4fd11a13d96425fd77 > # good: [edeec8d7420185d1c49b2739827bd921d2c2d485] [hotfix][runtime] Replace > all occurrences of letter to mail to unify wording of variables and > documentation. > git bisect good edeec8d7420185d1c49b2739827bd921d2c2d485 > # bad: [1b4ebce86b71d56f44185f1cb83d9a3b51de13df] > [FLINK-14262][table-planner-blink] support referencing function with > fully/partially qualified names in blink > git bisect bad 1b4ebce86b71d56f44185f1cb83d9a3b51de13df > # good: [25a3d9138cd5e39fc786315682586b75d8ac86ea] [hotfix] Move > TaskManagerSlot to o.a.f.runtime.resourcemanager.slotmanager > git bisect good 25a3d9138cd5e39fc786315682586b75d8ac86ea > # good: [362d7670593adc2e4b20650c8854398727d8102b] [FLINK-12122] Calculate > TaskExecutorUtilization when listing available slots > git bisect good 362d7670593adc2e4b20650c8854398727d8102b > # bad: [7e8218515baf630e668348a68ff051dfa49c90c3] > [FLINK-13969][Checkpointing] Do not allow trigger new checkpoitn after stop > the coordinator > git bisect bad 7e8218515baf630e668348a68ff051dfa49c90c3 > # bad: [269e7f007e855c2bdedf8bad64ef13f516a608a6] [FLINK-12122] Choose > SlotSelectionStrategy based on ClusterOptions#EVENLY_SPREAD_OUT_SLOTS_STRATEGY > git bisect bad 269e7f007e855c2bdedf8bad64ef13f516a608a6 > # bad: [2ab8b61f2f22f1a1ce7f92cd6b8dd32d2c0c227d] [FLINK-12122] Add > EvenlySpreadOutLocationPreferenceSlotSelectionStrategy > git bisect bad 2ab8b61f2f22f1a1ce7f92cd6b8dd32d2c0c227d > # first bad commit: [2ab8b61f2f22f1a1ce7f92cd6b8dd32d2c0c227d] [FLINK-12122] > Add EvenlySpreadOutLocationPreferenceSlotSelectionStrategy > {code} > I'm using the streaming WordCount example that I modified to have two > "inputs", similar to how the WordCount example is used in the > YARN/kerberos/Docker test. Instead of using the input once we use it like > this: > {code} > text = > env.readTextFile(params.get("input")).union(env.readTextFile(params.get("input"))); > {code} > to create two inputs from the same path. A jar is attached. -- This message was sent by Atlassian Jira (v8.3.4#803005)