[
https://issues.apache.org/jira/browse/FLINK-21023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268555#comment-17268555
]
Tang Yan edited comment on FLINK-21023 at 1/20/21, 12:52 PM:
-------------------------------------------------------------
[~trohrmann]
Thanks for your reply. Wordcount is only my simple test. In my case, I just
want to wrap some configuration into a XX.conf file and flink code to read from
the file to get the key-value map. I used to use '--files ' in spark job for
such case. So -yt in flink job can't support such case?
env.readTextFile(params.get("input")) This is the only line in my code.
was (Author: tang yan):
[~trohrmann]
Thanks for your reply. Wordcount is only my simple test. In my case, I just
want to wrap some configuration into a XX.conf file and flink code to read from
the file to get the key-value map. I used to use '--files ' in spark job for
such case. So -yt in flink job can't support such case?
> Task Manager uses the container dir of Job Manager when running flink job on
> yarn-cluster.
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-21023
> URL: https://issues.apache.org/jira/browse/FLINK-21023
> Project: Flink
> Issue Type: Bug
> Components: Client / Job Submission, Deployment / YARN
> Affects Versions: 1.12.0, 1.11.1
> Reporter: Tang Yan
> Priority: Major
>
> I want to try to use option -yt(yarnship) to distribute my config files to
> Yarn cluster, and read the file in code. I just used the flink example
> wordcount.
> Here is my submit command:
> /opt/Flink/bin/flink run -m yarn-cluster -p 1 -yt /path/to/conf -c
> org.apache.flink.examples.java.wordcount.WordCount
> /opt/Flink/examples/batch/WordCount.jar --input ./conf/cmp_online.cfg
> Test Result:
> I found that if the job manager and task manager are lunched on the same
> node, the job can run successfully. But when they're running on different
> node, the job will fail in the below ERRORs. I find the conf folder has been
> distributed to container cache dirs, such as
> [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf]
> on job manager node, and
> [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf]
> on task manager node. But why the task manager loads the conf file from the
> container_eXXX_000001 path (which is located on job manager node)?
> _2021-01-19 04:19:11,405 INFO org.apache.flink.yarn.YarnResourceManager [] -
> Registering TaskManager with ResourceID
> container_e283_1609125504851_3620_01_000002
> (akka.tcp://[email protected]:46785/user/rpc/taskmanager_0) at
> ResourceManager 2021-01-19 04:19:11,506 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource
> (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat))
> -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at
> main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched
> from SCHEDULED to DEPLOYING. 2021-01-19 04:19:11,507 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying CHAIN
> DataSource (at main(WordCount.java:69)
> (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at
> main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1)
> (attempt #0) to container_e283_1609125504851_3620_01_000002 @
> rphf1hsn026.qa.webex.com (dataPort=46647) 2021-01-19 04:19:11,608 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource
> (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat))
> -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at
> main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched
> from DEPLOYING to RUNNING. 2021-01-19 04:19:11,792 INFO
> org.apache.flink.api.common.io.LocatableInputSplitAssigner [] - Assigning
> remote split to host rphf1hsn026 2021-01-19 04:19:11,847 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource
> (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat))
> -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at
> main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched
> from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@3e19cc76.
> java.io.IOException: Error opening the Input Split
> [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg]
> [0,71]:
> /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
> (No such file or directory) at
> org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748)
> ~[?:1.8.0_272] Caused by: java.io.FileNotFoundException:
> /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
> (No such file or directory) at java.io.FileInputStream.open0(Native Method)
> ~[?:1.8.0_272] at java.io.FileInputStream.open(FileInputStream.java:195)
> ~[?:1.8.0_272] at java.io.FileInputStream.<init>(FileInputStream.java:138)
> ~[?:1.8.0_272] at
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at
> org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:996)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]_
--
This message was sent by Atlassian Jira
(v8.3.4#803005)