[ 
https://issues.apache.org/jira/browse/FLINK-14908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-14908:
-------------------------------------
    Description: 
User reported that distributing cache files through DFS does not work anymore: 
https://stackoverflow.com/questions/58978476/flink-1-9-wont-run-program-when-i-use-distributed-cache-why

I think the problematic part is in {{RestClusterClient#submitJob}}:
{code}
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
        artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
        filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
}
{code}

The code does not check if a file is in DFS, but just assumes it is in local FS 
and tries to add it to the rest request, which fails. The code on the receiver 
side in {{JobSubmitHandler}} still can support files distributed via DFS, but 
need to get proper paths to files in DFS.


  was:
User reported that distributing cache files through DFS does not work anymore: 
https://stackoverflow.com/questions/58978476/flink-1-9-wont-run-program-when-i-use-distributed-cache-why

I think the problematic part is in {{RestClusterClient#submitJob}}:
{code}
                        for (Map.Entry<String, 
DistributedCache.DistributedCacheEntry> artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
                                artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
                                filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
                        }
{code}

The code does not check if a file is in DFS, but just assumes it is in local FS 
and tries to add it to the rest request, which fails. The code on the receiver 
side in {{JobSubmitHandler}} still can support files distributed via DFS, but 
need to get proper paths to files in DFS.



> Distributing CacheFiles through DFS does not work
> -------------------------------------------------
>
>                 Key: FLINK-14908
>                 URL: https://issues.apache.org/jira/browse/FLINK-14908
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission, Runtime / REST
>    Affects Versions: 1.8.2, 1.9.1
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>
> User reported that distributing cache files through DFS does not work 
> anymore: 
> https://stackoverflow.com/questions/58978476/flink-1-9-wont-run-program-when-i-use-distributed-cache-why
> I think the problematic part is in {{RestClusterClient#submitJob}}:
> {code}
> for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : 
> jobGraph.getUserArtifacts().entrySet()) {
>       artifactFileNames.add(new 
> JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
> Path(artifacts.getValue().filePath).getName()));
>       filesToUpload.add(new 
> FileUpload(Paths.get(artifacts.getValue().filePath), 
> RestConstants.CONTENT_TYPE_BINARY));
> }
> {code}
> The code does not check if a file is in DFS, but just assumes it is in local 
> FS and tries to add it to the rest request, which fails. The code on the 
> receiver side in {{JobSubmitHandler}} still can support files distributed via 
> DFS, but need to get proper paths to files in DFS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to