[
https://issues.apache.org/jira/browse/FLINK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256814#comment-17256814
]
JieFang.He edited comment on FLINK-19067 at 12/31/20, 3:36 AM:
---------------------------------------------------------------
I think the reason is that the jobFiles are upload to the dispatcher node,but
the task get jobFiles from resource_manager node
When the job submit to the rest,the rest then submit it to dispatcher,the
jobFiles are the put to dispatcher
Here is the log
{code:java}
2020-12-31 00:42:29,255 DEBUG [BLOB connection for /127.0.0.1:43576]
[FileSystemBlobStore]: Copying from
/tmp/blobStore-5fe6b952-fece-4590-b637-04f19d8121dd/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109
to
/data2/zdh/flink/storageDir/default/blob/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109.
{code}
and here is the code in JobSubmitHandler
{code:java}
CompletableFuture<Acknowledge> jobSubmissionFuture =
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph,
timeout));
{code}
the gateway is define in DefaultDispatcherResourceManagerComponentFactory
{code:java}
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
10,
Time.milliseconds(50L));
{code}
{code:java}
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
{code}
But the Task get the jobFiles from resource_manager
The code of BlobClient.downloadFromBlobServer
{code:java}
static void downloadFromBlobServer(
@Nullable JobID jobId,
BlobKey blobKey,
File localJarFile,
InetSocketAddress serverAddress,
Configuration blobClientConfig,
int numFetchRetries) throws IOException {
......
catch (Throwable t) {
String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + "
from " + serverAddress +
" and store it under " + localJarFile.getAbsolutePath();
{code}
The serverAddress is define in TaskExecutor
{code:java}
final InetSocketAddress blobServerAddress = new InetSocketAddress(
clusterInformation.getBlobServerHostname(),
clusterInformation.getBlobServerPort());
blobCacheService.setBlobServerAddress(blobServerAddress);
{code}
The clusterInformation define in ResourceManagerRegistrationListener
{code:java}
if (resourceManagerConnection == connection) {
try {
establishResourceManagerConnection(
resourceManagerGateway,
resourceManagerId,
taskExecutorRegistrationId,
clusterInformation);
{code}
Where ResourceManagerRegistrationListener used
{code:java}
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
{code}
was (Author: hejiefang):
I think the reason is that the jobFiles are upload to the dispatcher node,but
the task get jobFiles from resource_manager node
When the job submit to the rest,the rest then submit it to dispatcher,the
jobFiles are the put to dispatcher
Here is the log
{code:java}
2020-12-31 00:42:29,255 DEBUG [BLOB connection for /127.0.0.1:43576]
[FileSystemBlobStore]: Copying from
/tmp/blobStore-5fe6b952-fece-4590-b637-04f19d8121dd/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109
to
/data2/zdh/flink/storageDir/default/blob/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109.
{code}
and here is the code in JobSubmitHandler
{code:java}
CompletableFuture<Acknowledge> jobSubmissionFuture =
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph,
timeout));
{code}
the gateway is define in DefaultDispatcherResourceManagerComponentFactory
{code:java}
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
10,
Time.milliseconds(50L));
{code}
{code:java}
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
{code}
But the Task get the jobFiles from resource_manager
The code of BlobClient.downloadFromBlobServer
{code:java}
static void downloadFromBlobServer(
@Nullable JobID jobId,
BlobKey blobKey,
File localJarFile,
InetSocketAddress serverAddress,
Configuration blobClientConfig,
int numFetchRetries) throws IOException {
......
catch (Throwable t) {
String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + "
from " + serverAddress +
" and store it under " + localJarFile.getAbsolutePath();
{code}
The serverAddress is define in TaskExecutor
{code:java}
final InetSocketAddress blobServerAddress = new InetSocketAddress(
clusterInformation.getBlobServerHostname(),
clusterInformation.getBlobServerPort());
blobCacheService.setBlobServerAddress(blobServerAddress);
{code}
The clusterInformation define in ResourceManagerRegistrationListener
{code:java}
if (resourceManagerConnection == connection) {
try {
establishResourceManagerConnection(
resourceManagerGateway,
resourceManagerId,
taskExecutorRegistrationId,
clusterInformation);
{code}
Where ResourceManagerRegistrationListener used
{code:java}
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
{code}
> FileNotFoundException when run flink examples
> ---------------------------------------------
>
> Key: FLINK-19067
> URL: https://issues.apache.org/jira/browse/FLINK-19067
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.11.1
> Reporter: JieFang.He
> Priority: Major
> Attachments: flink-jobmanager-deployer-hejiefang01.log,
> flink-jobmanager-deployer-hejiefang02.log,
> flink-taskmanager-deployer-hejiefang01.log,
> flink-taskmanager-deployer-hejiefang02.log
>
>
> 1、When run examples/batch/WordCount.jar,it will fail with the exception:
> Caused by: java.io.FileNotFoundException:
> /data2/flink/storageDir/default/blob/job_d29414828f614d5466e239be4d3889ac/blob_p-a2ebe1c5aa160595f214b4bd0f39d80e42ee2e93-f458f1c12dc023e78d25f191de1d7c4b
> (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.<init>(FileInputStream.java:138)
> at
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
> at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:143)
> at
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:105)
> at
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:87)
> at
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:501)
> at
> org.apache.flink.runtime.blob.BlobServerConnection.get(BlobServerConnection.java:231)
> at
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:117)
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)