[ 
https://issues.apache.org/jira/browse/FLINK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JieFang.He updated FLINK-19067:
-------------------------------
    Comment: was deleted

(was: I think the reason is that the jobFiles are upload to the dispatcher 
node,but the task get jobFiles from resource_manager node. So in HA mode, it 
need to ensure they are on one node

When the job submit to the rest,the rest then submit it to dispatcher,the 
jobFiles are the put to dispatcher

Here is the log
{code:java}
2020-12-31 00:42:29,255 DEBUG [BLOB connection for /127.0.0.1:43576] 
[FileSystemBlobStore]: Copying from 
/tmp/blobStore-5fe6b952-fece-4590-b637-04f19d8121dd/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109
 to 
/data2/zdh/flink/storageDir/default/blob/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109.
{code}
and here is the code in JobSubmitHandler
{code:java}
CompletableFuture<Acknowledge> jobSubmissionFuture = 
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, 
timeout));
{code}
the gateway is define in DefaultDispatcherResourceManagerComponentFactory
{code:java}
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = 
new RpcGatewayRetriever<>(
   rpcService,
   DispatcherGateway.class,
   DispatcherId::fromUuid,
   10,
   Time.milliseconds(50L));
{code}
{code:java}
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
   configuration,
   dispatcherGatewayRetriever,
   resourceManagerGatewayRetriever,
   blobServer,
   executor,
   metricFetcher,
   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
   fatalErrorHandler);
{code}
 

But the Task get the jobFiles from resource_manager

The code of BlobClient.downloadFromBlobServer
{code:java}
static void downloadFromBlobServer(
      @Nullable JobID jobId,
      BlobKey blobKey,
      File localJarFile,
      InetSocketAddress serverAddress,
      Configuration blobClientConfig,
      int numFetchRetries) throws IOException {
    ......
      catch (Throwable t) {
         String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + " 
from " + serverAddress +
            " and store it under " + localJarFile.getAbsolutePath();
{code}
The serverAddress is define in TaskExecutor
{code:java}
final InetSocketAddress blobServerAddress = new InetSocketAddress(
   clusterInformation.getBlobServerHostname(),
   clusterInformation.getBlobServerPort());

blobCacheService.setBlobServerAddress(blobServerAddress);
{code}
The clusterInformation define in ResourceManagerRegistrationListener
{code:java}
if (resourceManagerConnection == connection) {
   try {
      establishResourceManagerConnection(
         resourceManagerGateway,
         resourceManagerId,
         taskExecutorRegistrationId,
         clusterInformation);
{code}
Where ResourceManagerRegistrationListener used
{code:java}
resourceManagerConnection =
   new TaskExecutorToResourceManagerConnection(
      log,
      getRpcService(),
      taskManagerConfiguration.getRetryingRegistrationConfiguration(),
      resourceManagerAddress.getAddress(),
      resourceManagerAddress.getResourceManagerId(),
      getMainThreadExecutor(),
      new ResourceManagerRegistrationListener(),
      taskExecutorRegistration);
{code}
 

 )

> resource_manager and dispatcher register on different nodes in HA mode will 
> cause FileNotFoundException
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19067
>                 URL: https://issues.apache.org/jira/browse/FLINK-19067
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.11.1
>            Reporter: JieFang.He
>            Assignee: Robert Metzger
>            Priority: Major
>         Attachments: flink-jobmanager-deployer-hejiefang01.log, 
> flink-jobmanager-deployer-hejiefang02.log, 
> flink-taskmanager-deployer-hejiefang01.log, 
> flink-taskmanager-deployer-hejiefang02.log
>
>
> When run examples/batch/WordCount.jar,it will fail with the exception:
> {code:java}
> Caused by: java.io.FileNotFoundException: 
> /data2/flink/storageDir/default/blob/job_d29414828f614d5466e239be4d3889ac/blob_p-a2ebe1c5aa160595f214b4bd0f39d80e42ee2e93-f458f1c12dc023e78d25f191de1d7c4b
>  (No such file or directory)
>  at java.io.FileInputStream.open0(Native Method)
>  at java.io.FileInputStream.open(FileInputStream.java:195)
>  at java.io.FileInputStream.<init>(FileInputStream.java:138)
>  at 
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:143)
>  at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:105)
>  at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:87)
>  at 
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:501)
>  at 
> org.apache.flink.runtime.blob.BlobServerConnection.get(BlobServerConnection.java:231)
>  at 
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:117)
> {code}
>  
> I think the reason is that the jobFiles are upload to the dispatcher node,but 
> the task get jobFiles from resource_manager node. So in HA mode, it need to 
> ensure they are on one node
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to