[
https://issues.apache.org/jira/browse/FLINK-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16621599#comment-16621599
]
Till Rohrmann commented on FLINK-10370:
---------------------------------------
This problem should only arise in the detached job-mode. In the attached
job-mode we still use the session mode underneath and everything should work.
In job-mode the main method is still executed on the client side. But what's no
longer possible is to upload a local file to the {{BlobServer}} (because it is
not yet running). What we would have to do in this case is to include the local
files in the set of files which we upload to HDFS in order to start the Yarn
application. That would include the files in the classpath of every instance,
making them easily accessible.
For the container entrypoint, one needs to make sure that all required local
files are stored on the image.
> DistributedCache does not work in job cluster mode.
> ---------------------------------------------------
>
> Key: FLINK-10370
> URL: https://issues.apache.org/jira/browse/FLINK-10370
> Project: Flink
> Issue Type: Bug
> Components: Cluster Management, Job-Submission
> Affects Versions: 1.6.0
> Reporter: Dawid Wysakowicz
> Priority: Major
>
> When using job cluster mode the client does not follow a standard submission
> path during which {{DistributedCacheEntries}} are written into
> {{Configuration}}. Therefore the files cannot be accessed in the job.
> How to reproduce:
> Simple job that uses {{DistributedCache}}:
> {code}
> public class DistributedCacheViaDfsTestProgram {
> public static void main(String[] args) throws Exception {
> final ParameterTool params = ParameterTool.fromArgs(args);
> final String inputFile =
> "hdfs://172.17.0.2:8020/home/hadoop-user/in";
> final String outputFile = "/tmp/out";
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.registerCachedFile(inputFile, "test_data", false);
> env.fromElements(1)
> .map(new TestMapFunction())
> .writeAsText(outputFile,
> FileSystem.WriteMode.OVERWRITE);
> env.execute("Distributed Cache Via Blob Test Program");
> }
> static class TestMapFunction extends RichMapFunction<Integer, String> {
> @Override
> public String map(Integer value) throws Exception {
> final Path testFile =
> getRuntimeContext().getDistributedCache().getFile("test_data").toPath();
> return Files.readAllLines(testFile)
> .stream()
> .collect(Collectors.joining("\n"));
> }
> }
> }
> {code}
> If one runs this program e.g. in yarn job cluster mode this will produce:
> {code}
> java.lang.IllegalArgumentException: File with name 'test_data' is not
> available. Did you forget to register the file?
> at
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:110)
> at
> org.apache.flink.streaming.tests.DistributedCacheViaDfsTestProgram$TestMapFunction.map(DistributedCacheViaDfsTestProgram.java:59)
> at
> org.apache.flink.streaming.tests.DistributedCacheViaDfsTestProgram$TestMapFunction.map(DistributedCacheViaDfsTestProgram.java:55)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:164)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> This job will run fine though, if it will be submitted to yarn-session
> cluster.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)