[ 
https://issues.apache.org/jira/browse/FLINK-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620590#comment-16620590
 ] 

Dawid Wysakowicz commented on FLINK-10370:
------------------------------------------

I have also some additional concerns regarding job cluster mode. Previously the 
main method was executed on the client side (in most cases, besides web 
submission). In 1.6.0 we allowed distributing local files via BlobServer, if 
file registered in {{DistributedCache}} is a local one it would be uploaded 
during submission. This obviously won't work in job cluster mode, as the 
{{main}} method is executed on "server" side on which this file most probably 
does not exist. This also applies to any other code in main method that assumes 
it is run on the client side. 

This is most notable in yarn as the job-cluster mode is now used by default for 
{{-m yarn-cluster}}.  This means that code that had such assumptions will work 
in legacy mode (and session-cluster mode), but won't work in yarn-cluster.

> DistributedCache does not work in job cluster mode.
> ---------------------------------------------------
>
>                 Key: FLINK-10370
>                 URL: https://issues.apache.org/jira/browse/FLINK-10370
>             Project: Flink
>          Issue Type: Bug
>          Components: Cluster Management, Job-Submission
>    Affects Versions: 1.6.0
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>
> When using job cluster mode the client does not follow a standard submission 
> path during which {{DistributedCacheEntries}} are written into 
> {{Configuration}}. Therefore the files cannot be accessed in the job.
> How to reproduce:
> Simple job that uses {{DistributedCache}}:
> {code}
> public class DistributedCacheViaDfsTestProgram {
>       public static void main(String[] args) throws Exception {
>               final ParameterTool params = ParameterTool.fromArgs(args);
>               final String inputFile = 
> "hdfs://172.17.0.2:8020/home/hadoop-user/in";
>               final String outputFile = "/tmp/out";
>               final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               env.setParallelism(1);
>               env.registerCachedFile(inputFile, "test_data", false);
>               env.fromElements(1)
>                       .map(new TestMapFunction())
>                       .writeAsText(outputFile, 
> FileSystem.WriteMode.OVERWRITE);
>               env.execute("Distributed Cache Via Blob Test Program");
>       }
>       static class TestMapFunction extends RichMapFunction<Integer, String> {
>               @Override
>               public String map(Integer value) throws Exception {
>                       final Path testFile = 
> getRuntimeContext().getDistributedCache().getFile("test_data").toPath();
>                       return Files.readAllLines(testFile)
>                               .stream()
>                               .collect(Collectors.joining("\n"));
>               }
>       }
> }
> {code}
> If one runs this program e.g. in yarn job cluster mode this will produce:
> {code}
> java.lang.IllegalArgumentException: File with name 'test_data' is not 
> available. Did you forget to register the file?
>       at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:110)
>       at 
> org.apache.flink.streaming.tests.DistributedCacheViaDfsTestProgram$TestMapFunction.map(DistributedCacheViaDfsTestProgram.java:59)
>       at 
> org.apache.flink.streaming.tests.DistributedCacheViaDfsTestProgram$TestMapFunction.map(DistributedCacheViaDfsTestProgram.java:55)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>       at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:164)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> This job will run fine though, if it will be submitted to yarn-session 
> cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to