[
https://issues.apache.org/jira/browse/FLINK-25099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17451193#comment-17451193
]
chenqizhu edited comment on FLINK-25099 at 11/30/21, 3:41 PM:
--------------------------------------------------------------
When I ignore setting the default.fs=flinkcluster and specifying the checkpoint
path to hdfs://flinkcluster/xxxxx,
The job could not run properly, the status was always INITIALIZING(It seems
that the jobmanger cannot be started, but I'm not sure why)
Changing the checkpoint path to hdfs:///xxxxx and everything works fine(It
obviously uses the default HDFS)[~zuston]
The following is jobmanager.log
{code:java}
2021-11-30 23:32:44,345 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at
akka://flink/user/rpc/resourcemanager_0 .
2021-11-30 23:32:44,406 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Starting DefaultLeaderElectionService with
ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}.
2021-11-30 23:32:44,407 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Starting the resource manager.
2021-11-30 23:32:44,408 INFO
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Starting DefaultLeaderRetrievalService with
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-11-30 23:32:44,409 INFO
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Starting DefaultLeaderRetrievalService with
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-11-30 23:32:44,409 INFO
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner [] -
DefaultDispatcherRunner was granted leadership with leader id
8d5e0cf1-06da-4648-bd89-f2949356902f. Creating new DispatcherLeaderProcess.
2021-11-30 23:32:44,414 INFO
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] -
Start JobDispatcherLeaderProcess.
2021-11-30 23:32:44,421 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.MiniDispatcher at
akka://flink/user/rpc/dispatcher_1 .
2021-11-30 23:32:44,449 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Starting DefaultLeaderElectionService with
ZooKeeperLeaderElectionDriver{leaderPath='/leader/cad84e92fb8ac17daf839af61fb8f9ae/job_manager_lock'}.
2021-11-30 23:32:44,498 WARN akka.remote.transport.netty.NettyTransport
[] - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: flink7/10.21.0.7:26635
2021-11-30 23:32:44,499 WARN akka.remote.ReliableDeliverySupervisor
[] - Association with remote system [akka.tcp://flink@flink7:26635]
has failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@flink7:26635]] Caused by: [java.net.ConnectException:
Connection refused: flink7/10.21.0.7:26635]
2021-11-30 23:32:44,504 INFO
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing
over to rm2
2021-11-30 23:32:44,547 INFO org.apache.flink.yarn.YarnResourceManagerDriver
[] - Recovered 0 containers from previous attempts ([]).
2021-11-30 23:32:44,547 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Recovered 0 workers from previous attempt.
2021-11-30 23:32:44,584 WARN akka.remote.transport.netty.NettyTransport
[] - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: flink7/10.21.0.7:26635
2021-11-30 23:32:44,585 WARN akka.remote.ReliableDeliverySupervisor
[] - Association with remote system [akka.tcp://flink@flink7:26635]
has failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@flink7:26635]] Caused by: [java.net.ConnectException:
Connection refused: flink7/10.21.0.7:26635]
2021-11-30 23:32:44,601 INFO org.apache.hadoop.conf.Configuration
[] - resource-types.xml not found
2021-11-30 23:32:44,602 INFO
org.apache.hadoop.yarn.util.resource.ResourceUtils [] - Unable to
find 'resource-types.xml'.
2021-11-30 23:32:44,615 INFO
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled
external resources: []
2021-11-30 23:32:44,620 INFO
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper bound
of the thread pool size is 500
2021-11-30 23:32:44,623 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Starting DefaultLeaderElectionService with
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
2021-11-30 23:32:44,626 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
ResourceManager akka.tcp://flink@flink26:30734/user/rpc/resourcemanager_0 was
granted leadership with fencing token bceb6779160f5ffb00c5b73c156844a0
End of LogType:jobmanager.log.This log file belongs to a running container
(container_e19_1637144069883_16539_09_000001) and so may not be complete.
*******************************************************************************
{code}
was (Author: libra_816):
When I ignore setting the default.fs=flinkcluster and specifying the checkpoint
path to hdfs://flinkcluster/xxxxx,
The job could not run properly, the status was always INITIALIZING(It seems
that the jobmanger cannot be started, but I'm not sure why)
Changing the checkpoint path to hdfs:///xxxxx and everything works fine(It
obviously uses the default HDFS)[~zuston]
> flink on yarn Accessing two HDFS Clusters
> -----------------------------------------
>
> Key: FLINK-25099
> URL: https://issues.apache.org/jira/browse/FLINK-25099
> Project: Flink
> Issue Type: Bug
> Components: Deployment / YARN, FileSystems, Runtime / State Backends
> Affects Versions: 1.13.3
> Environment: flink : 1.13.3
> hadoop : 3.3.0
> Reporter: chenqizhu
> Priority: Major
> Attachments: flink-chenqizhu-client-hdfsn21n163.log
>
>
> Flink version 1.13 supports configuration of Hadoop properties in
> flink-conf.yaml via flink.hadoop.*. There is A requirement to write
> checkpoint to HDFS with SSDS (called cluster B) to speed checkpoint writing,
> but this HDFS cluster is not the default HDFS in the flink client (called
> cluster A by default). Yaml is configured with nameservices for cluster A and
> cluster B, which is similar to HDFS federated mode.
> The configuration is as follows:
>
> {code:java}
> flink.hadoop.dfs.nameservices: ACluster,BCluster
> flink.hadoop.fs.defaultFS: hdfs://BCluster
> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.xxxx:9000
> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.xxxx:50070
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xxxxxx:9000
> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xxxxxx:50070
> flink.hadoop.dfs.client.failover.proxy.provider.ACluster:
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xxxxxx:9000
> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xxxxxx:50070
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xxxxxx:9000
> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.xxxxx:50070
> flink.hadoop.dfs.client.failover.proxy.provider.BCluster:
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
> {code}
>
> However, an error occurred during the startup of the job, which is reported
> as follows:
> (change configuration items to A flink local client default HDFS cluster, the
> operation can be normal boot: flink.hadoop.fs.DefaultFS: hdfs: / / ACluster)
> {noformat}
> Caused by: BCluster
> java.net.UnknownHostException: BCluster
> at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
> at
> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:374)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:308)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745){noformat}
> Is there a solution to the above problems? The pain point is that Flink can
> access two HDFS clusters, preferably through the configuration of Flink-conf.
> yaml.
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)