[ 
https://issues.apache.org/jira/browse/FLINK-21023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268475#comment-17268475
 ] 

Till Rohrmann commented on FLINK-21023:
---------------------------------------

I think there is a misunderstanding how the {{-yt}} works. {{-yt}} is not 
intended to make input data for your program available. Instead the idea is 
that you can upload configuration/libraries/dependencies which are needed by 
your application to the Yarn cluster.

In your case, you should first make {{./conf/cmp_online.cfg}} available by 
uploading it to HDFS. Then you can start your application reading this file via 
{{--input hdfs:///a.b.c:1234/path/to/file}}.

The reason why the TM tries to read from the JM's container is that you are 
specifying a local path for the input data. The path is local wrt to the JM 
container because it is the one responsible for assigning input splits to read.

> Task Manager uses the container dir of Job Manager when running flink job on 
> yarn-cluster.
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21023
>                 URL: https://issues.apache.org/jira/browse/FLINK-21023
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission, Deployment / YARN
>    Affects Versions: 1.12.0, 1.11.1
>            Reporter: Tang Yan
>            Priority: Major
>
> I want to try to use option  -yt(yarnship) to distribute my config files to 
> Yarn cluster, and read the file in code. I just used the flink example 
> wordcount.
> Here is my submit command:
> /opt/Flink/bin/flink run -m yarn-cluster -p 1 -yt /path/to/conf -c 
> org.apache.flink.examples.java.wordcount.WordCount 
> /opt/Flink/examples/batch/WordCount.jar --input ./conf/cmp_online.cfg
> Test Result:
> I found that if the job manager and task manager are lunched on the same 
> node, the job can run successfully. But when they're running on different 
> node, the job will fail in the below ERRORs. I find the conf folder has been 
> distributed to container cache dirs, such as 
> [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf]
>  on job manager node, and 
> [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf]
>  on task manager node. But why the task manager loads the conf file from the 
> container_eXXX_000001 path (which is located on job manager node)?
> _2021-01-19 04:19:11,405 INFO org.apache.flink.yarn.YarnResourceManager [] - 
> Registering TaskManager with ResourceID 
> container_e283_1609125504851_3620_01_000002 
> (akka.tcp://fl...@rphf1hsn026.qa.webex.com:46785/user/rpc/taskmanager_0) at 
> ResourceManager 2021-01-19 04:19:11,506 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
> (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) 
> -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
> main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched 
> from SCHEDULED to DEPLOYING. 2021-01-19 04:19:11,507 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying CHAIN 
> DataSource (at main(WordCount.java:69) 
> (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at 
> main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) 
> (attempt #0) to container_e283_1609125504851_3620_01_000002 @ 
> rphf1hsn026.qa.webex.com (dataPort=46647) 2021-01-19 04:19:11,608 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
> (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) 
> -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
> main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched 
> from DEPLOYING to RUNNING. 2021-01-19 04:19:11,792 INFO 
> org.apache.flink.api.common.io.LocatableInputSplitAssigner [] - Assigning 
> remote split to host rphf1hsn026 2021-01-19 04:19:11,847 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
> (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) 
> -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
> main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched 
> from RUNNING to FAILED on 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@3e19cc76. 
> java.io.IOException: Error opening the Input Split 
> [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg]
>  [0,71]: 
> /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
>  (No such file or directory) at 
> org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) 
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470)
>  ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
>  ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>  ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) 
> ~[?:1.8.0_272] Caused by: java.io.FileNotFoundException: 
> /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
>  (No such file or directory) at java.io.FileInputStream.open0(Native Method) 
> ~[?:1.8.0_272] at java.io.FileInputStream.open(FileInputStream.java:195) 
> ~[?:1.8.0_272] at java.io.FileInputStream.<init>(FileInputStream.java:138) 
> ~[?:1.8.0_272] at 
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>  ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:996)
>  ~[flink-dist_2.11-1.11.1.jar:1.11.1]_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to