[
https://issues.apache.org/jira/browse/FLINK-33569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bodong Liu updated FLINK-33569:
-------------------------------
Description:
I now use the `yarn-application` mode to deploy Flink. I found that when I set
Hadoop's storage to the s3a file system, Flink could not submit tasks to Yarn.
The error is reported as follows:
{code:java}
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy
Yarn Application Cluster
at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:481)
at
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
at
org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:212)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1098)
at
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: org.apache.hadoop.fs.PathIOException: `Cannot get relative path for
URI:file:///tmp/application_1700122774429_0001-flink-conf.yaml5526160496134930395.tmp':
Input/output error
at
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
at
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
at
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at
org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
at
org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
at
org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
at
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1050)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:626)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:474)
... 10 more
{code}
I found by looking through the source code and debugging that when Hadoop uses
the s3a file system, uploading and downloading files must use URIs with
`scheme` to build path parameters.
In the `org.apache.flink.yarn.YarnClusterDescriptor` class, when uploading a
temporarily generated `yaml` configuration file, the absolute path of the file
is used instead of the URI as the path construction parameter, but other file
upload and download behaviors They all use URI as the path parameter.
This is the reason for the error reported above.
was:
I now use the `yarn-application` mode to deploy Flink. I found that when I set
Hadoop's storage to the s3a file system, Flink could not submit tasks to Yarn.
The error is reported as follows:
{code:java}
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy
Yarn Application Cluster
at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:481)
at
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
at
org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:212)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1098)
at
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: org.apache.hadoop.fs.PathIOException: `Cannot get relative path for
URI:file:///tmp/application_1700122774429_0001-flink-conf.yaml5526160496134930395.tmp':
Input/output error
at
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
at
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
at
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
at
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
at
org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
at
org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
at
org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
at
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1050)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:626)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:474)
... 10 more
{code}
I found by looking through the source code and debugging that when Hadoop uses
the s3a file system, uploading and downloading files must use URIs with
`scheme` to build path parameters. In the
`org.apache.flink.yarn.YarnClusterDescriptor` class, when uploading a
temporarily generated `yaml` configuration file, the absolute path of the file
is used instead of the URI as the path construction parameter, but other file
upload and download behaviors They all use URI as the path parameter. This is
the reason for the error reported above.
> Could not deploy yarn-application when using yarn over s3a filesystem.
> ----------------------------------------------------------------------
>
> Key: FLINK-33569
> URL: https://issues.apache.org/jira/browse/FLINK-33569
> Project: Flink
> Issue Type: Bug
> Components: Deployment / YARN
> Affects Versions: 1.18.0, 1.17.1
> Environment: h1. *Env:*
> * OS: ArchLinux kernel:{color:#000000}6.6.1 AMD64{color}
> * Flink: 1.17.1
> * Hadoop: 3.3.6
> * Minio: 2023-11-15
> h1. Settings
> h2. hadoop core-site.xml:
>
> {code:java}
> <property>
> <name>fs.defaultFS</name>
> <value>s3a://hadoop</value>
> </property>
> <property>
> <name>fs.s3a.path.style.access</name>
> <value>true</value>
> </property>
> <!-- minio username -->
> <property>
> <name>fs.s3a.access.key</name>
> <value>admin</value>
> </property>
> <!-- minio password -->
> <property>
> <name>fs.s3a.secret.key</name>
> <value>password</value>
> </property>
> <!-- minio endpoint -->
> <property>
> <name>fs.s3a.endpoint</name>
> <value>http://localhost:9000</value>
> </property>
> <property>
> <name>fs.s3a.connection.establish.timeout</name>
> <value>5000</value>
> </property>
> <property>
> <name>fs.s3a.multipart.size</name>
> <value>512M</value>
> </property>
> <property>
> <name>fs.s3a.impl</name>
> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
> </property>
> <property>
> <name>fs.AbstractFileSystem.s3a.impl</name>
> <value>org.apache.hadoop.fs.s3a.S3A</value>
> </property>
> <!-- S3 end -->{code}
> h1. Flink run command:
> ./bin/flink run-application -t yarn-application
> ./examples/streaming/TopSpeedWindowing.jar
>
>
> Reporter: Bodong Liu
> Priority: Minor
> Attachments: 2023-11-16_16-47.png, image-2023-11-16-16-46-21-684.png,
> image-2023-11-16-16-48-40-223.png
>
>
>
> I now use the `yarn-application` mode to deploy Flink. I found that when I
> set Hadoop's storage to the s3a file system, Flink could not submit tasks to
> Yarn.
> The error is reported as follows:
> {code:java}
> ------------------------------------------------------------
> The program finished with the following exception:
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn Application Cluster
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:481)
> at
> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
> at
> org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:212)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1098)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: org.apache.hadoop.fs.PathIOException: `Cannot get relative path
> for
> URI:file:///tmp/application_1700122774429_0001-flink-conf.yaml5526160496134930395.tmp':
> Input/output error
> at
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
> at
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
> at
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$26(S3AFileSystem.java:3854)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2480)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2499)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3847)
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1050)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:626)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:474)
> ... 10 more
> {code}
> I found by looking through the source code and debugging that when Hadoop
> uses the s3a file system, uploading and downloading files must use URIs with
> `scheme` to build path parameters.
> In the `org.apache.flink.yarn.YarnClusterDescriptor` class, when uploading a
> temporarily generated `yaml` configuration file, the absolute path of the
> file is used instead of the URI as the path construction parameter, but other
> file upload and download behaviors They all use URI as the path parameter.
> This is the reason for the error reported above.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)