[ 
https://issues.apache.org/jira/browse/FLINK-15007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986674#comment-16986674
 ] 

Xintong Song commented on FLINK-15007:
--------------------------------------

I think I find the cause of this issue.

I was looking at this [log|https://api.travis-ci.org/v3/job/614505046/log.txt] 
that [~gjy] reported in 
[FLINK-14834|https://issues.apache.org/jira/browse/FLINK-14834].

According to the log, RM first received on slot request, and started a TM for 
that request. After the first TM is registered, RM received another 2 slot 
requests, this time it started only 1 TM. We have seen similar problems before.

When RM starts a TM, it also generates pending task slots according to the 
number of slots the new TM should have (in our case 1), so that RM can assign 
slot requests to pending slots before the TM is registered. Later when TM 
registers to RM, the registered actual slots will be mapped to the pending 
slots if they have the same profile, and slot requests assigned to the pending 
slot will be assigned to the corresponding actual slot. If the profile of 
registered slot is different from the pending slot, the pending slot will not 
be consumed.

In our case, when the first request arrives, RM starts a new TM, generates a 
pending slot, and assign the request to the pending slot. When the TM is 
registered, the pending slot is not consumed because the slot profile doesn't 
match. Yet, the request will be assigned to the registered slot, and the 
pending slot becomes unassigned. Then the next 2 slot requests arrive, 1 of the 
request will be assigned to the pending slot, so RM only starts new TM for the 
other request. As a result, there's no pending TM and the slot request assigned 
to the pending slot will never be satisfied.

The reason that pending slot and registered slot may have different profile, is 
because memory sizes on RM / TM are calculated from configuration in different 
code paths, so the calculated results may suffer from slight rounding errors 
due to the fraction multiplying. Previously in 1.9, we solve this problem by 
explicitly overwriting managed memory size in the configuration to make sure RM 
/ TM have the same managed memory size, which is the only value in slot profile 
the is actually used. That solves most of the problem. But we failed to 
discover the problem in 1.9 that there is a bytes -> megabytes -> bytes 
conversion of managed memory size on TM side, which may also introduce errors. 
In 1.9 all the fields of ResourceProfile are represented by MB, so the 
conversion should not cause problems. But recently we changed them to 
MemorySize that stores the bytes value, which uncovers the accuracy loss.

I think this also explains [~aljoscha]'s finding that not all the value of 
`taskmanager.heap.size` triggers the problem. Because different value of 
`taskmanager.heap.size` can result in different managed memory size, and not 
all managed memory sizes have accuracy loss in the conversions.

The problem should be fixed by FLIP-49, which will be merged soon. In FLIP-49 
we calculated the memory sizes with exactly same code paths, to eliminate the 
differences between resource profiles calculated on RM / TM sides.

> Flink on YARN does not request required TaskExecutors in some cases
> -------------------------------------------------------------------
>
>                 Key: FLINK-15007
>                 URL: https://issues.apache.org/jira/browse/FLINK-15007
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination, Runtime / Task
>    Affects Versions: 1.10.0
>            Reporter: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.10.0
>
>         Attachments: DualInputWordCount.jar
>
>
> This was discovered while debugging FLINK-14834. In some cases Flink does not 
> request new {{TaskExecutors}} even though new slots are requested. You can 
> see this in some of the logs attached to FLINK-14834.
> You can reproduce this using 
> [https://github.com/aljoscha/docker-hadoop-cluster] to bring up a YARN 
> cluster and then running a compiled Flink in there.
> When you run
> {code:java}
> bin/flink run -m yarn-cluster -p 3 -yjm 1200 -ytm 1200 
> /root/DualInputWordCount.jar --input hdfs:///wc-in-1 --output hdfs:///wc-out 
> && hdfs dfs -rm -r /wc-out
> {code}
> the job waits and eventually fails because it does not have enough slots. 
> (You can see in the log that 3 new slots are requested but only 2 
> {{TaskExecutors}} are requested.
> When you run
> {code:java}
> bin/flink run -m yarn-cluster -p 3 -yjm 1224 -ytm 1224 
> /root/DualInputWordCount.jar --input hdfs:///wc-in-1 --output hdfs:///wc-out 
> && hdfs dfs -rm -r /wc-out
> {code}
> runs successfully.
> This is the {{git bisect}} log that identifies the first faulty commit 
> ([https://github.com/apache/flink/commit/9fed0ddc5bc015f98246a2d8d9adbe5fb2b91ba4|https://github.com/apache/flink/commit/9fed0ddc5bc015f98246a2d8d9adbe5fb2b91ba4]):
> {code:java}
> git bisect start
> # good: [09f2f43a1d73c76bf4d3f4a1205269eb860deb14] [FLINK-14154][ml] Add the 
> class for multivariate Gaussian Distribution.
> git bisect good 09f2f43a1d73c76bf4d3f4a1205269eb860deb14
> # bad: [85905f80e9711967711c2992612dccdd2cc211ac] [FLINK-14834][tests] 
> Disable flaky yarn_kerberos_docker (default input) test
> git bisect bad 85905f80e9711967711c2992612dccdd2cc211ac
> # good: [c9c8a29b1b2e4f2886fba1524432f9788b564e61] 
> [FLINK-14759][coordination] Remove unused class TaskManagerCliOptions
> git bisect good c9c8a29b1b2e4f2886fba1524432f9788b564e61
> # good: [c9c8a29b1b2e4f2886fba1524432f9788b564e61] 
> [FLINK-14759][coordination] Remove unused class TaskManagerCliOptions
> git bisect good c9c8a29b1b2e4f2886fba1524432f9788b564e61
> # bad: [ae539c97c858b94e0e2504b54a8517ac1383482a] [hotfix][runtime] Check 
> managed memory fraction range when setting it into StreamConfig
> git bisect bad ae539c97c858b94e0e2504b54a8517ac1383482a
> # good: [01d6972ab267807b8afccb09a45c454fa76d6c4b] [hotfix] Refactor out 
> slots creation from the TaskSlotTable constructor
> git bisect good 01d6972ab267807b8afccb09a45c454fa76d6c4b
> # bad: [d32e1d00854e24bc4bb3aad6d866c2d709acd993] [FLINK-14594][core] Change 
> Resource to use BigDecimal as its value
> git bisect bad d32e1d00854e24bc4bb3aad6d866c2d709acd993
> # bad: [25f87ec208a642283e995811d809632129ca289a] 
> [FLINK-11935][table-planner] Fix cast timestamp/date to string to avoid 
> Gregorian cutover
> git bisect bad 25f87ec208a642283e995811d809632129ca289a
> # bad: [21c6b85a6f5991aabcbcd41fedc860d662d478fb] [FLINK-14842][table] add 
> logging for loaded modules and functions
> git bisect bad 21c6b85a6f5991aabcbcd41fedc860d662d478fb
> # bad: [48986aa0b89de731d1b9136b59d409933cc15408] [hotfix] Remove unnecessary 
> comments about memory size calculation before network init
> git bisect bad 48986aa0b89de731d1b9136b59d409933cc15408
> # bad: [4c4652efa43ed8ab456f5f63c89b57d8c4a621f8] [hotfix] Remove unused 
> number of slots in MemoryManager
> git bisect bad 4c4652efa43ed8ab456f5f63c89b57d8c4a621f8
> # bad: [9fed0ddc5bc015f98246a2d8d9adbe5fb2b91ba4] [FLINK-14400] Shrink the 
> scope of MemoryManager from TaskExecutor to slot
> git bisect bad 9fed0ddc5bc015f98246a2d8d9adbe5fb2b91ba4
> # first bad commit: [9fed0ddc5bc015f98246a2d8d9adbe5fb2b91ba4] [FLINK-14400] 
> Shrink the scope of MemoryManager from TaskExecutor to slot
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to