[ 
https://issues.apache.org/jira/browse/FLINK-20982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17277923#comment-17277923
 ] 

Levani Kokhreidze commented on FLINK-20982:
-------------------------------------------

Sorry for the late reply. Issue got resolved once I've set kubernetes.namespace 
config value to the same one where Flink job runs. It may have been related to 
my minikube setup.

I am gonna close this issue.

> Flink TaskManager throws RegistrationTimeoutException when using Kubernetes HA
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-20982
>                 URL: https://issues.apache.org/jira/browse/FLINK-20982
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes
>    Affects Versions: 1.12.0
>            Reporter: Levani Kokhreidze
>            Priority: Major
>
> I'm experimenting with Flink 1.12 release and testing out Kubernetes HA 
> feature with Minikube. Unfortunately, when using Kubernetes HA, *TaskManager* 
> can not register at *ResourceManager* and it throws 
> *RegistrationTimeoutException* after 5min.
> I can see that JobManager creates config maps with Kubernetes HA
>  
> {code:java}
> ➜ k get configmaps
> NAME                                                          DATA   AGE
> cluster1-00000000000000000000000000000000-jobmanager-leader   2      109s
> cluster1-dispatcher-leader                                    4      2m
> cluster1-resourcemanager-leader                               2      2m
> cluster1-restserver-leader                                    2      2m
> {code}
>   
> And when inspecting *{{cluster1-resourcemanager-leader}}* it seems to have 
> correct value.
> {code:java}
> ➜ k describe configmaps cluster1-resourcemanager-leader
> Name:         cluster1-resourcemanager-leader
> Namespace:    default
> Labels:       app=cluster1
>               configmap-type=high-availability
>               type=flink-native-kubernetes
> Annotations:  control-plane.alpha.kubernetes.io/leader:
>                 
> {"holderIdentity":"c227d599-9dee-4f98-81ed-dde40a1865d6","leaseDuration":15.000000000,"acquireTime":"2021-01-14T11:26:27.660920Z","renewTi...Data
> ====
> address:
> ----
> akka.tcp://[email protected]:6565/user/rpc/resourcemanager_0
> sessionId:
> ----
> 8ede2bdf-4aa2-4da7-9a7a-bfc737e277bb
> Events:  <none>{code}
> *172.18.0.9* is indeed the IP of the leader.
> Connectivity from TaskManager pod also checks out.
> {code:java}
> bash-4.2$ nc -vz 172.18.0.9 6565
> Ncat: Version 7.50 ( https://nmap.org/ncat )
> Ncat: Connected to 172.18.0.9:6565.
> Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.{code}
> Here's flink-conf.yaml file.
> {code:java}
> bash-4.2$ cat /opt/flink/conf/flink-conf.yaml
> jobmanager.rpc.port: 6565
> jobmanager.heap.size: 1024m
> jobmanager.execution.failover-strategy: region
> blob.server.port: 10901
> taskmanager.memory.process.size: 1728m
> taskmanager.numberOfTaskSlots: 1
> taskmanager.rpc.port: 6565
> taskmanager.data.port: 10901
> parallelism.default: 1
> # KUBERNETES HA
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: cluster1
> # ZOOKEEPER HA
> #high-availability: zookeeper
> #high-availability.zookeeper.path.root: /flink
> #high-availability.cluster-id: cluster1
> high-availability.jobmanager.port: 6565
> s3.path.style.access: true
> s3.endpoint: http://minio.streaming-cluster-minio.svc.cluster.local:9000
> web.submit.enable: false
> metrics.reporters: prom
> metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.port: 8097
> s3.secret-key: ****
> s3.access-key: minio
> taskmanager.host: 172.18.0.10
> high-availability.storageDir: s3://state/ha
> {code}
> The same setup, but with *zookeeper* HA works without any issues.
> JobManager starts with  the following command:
> {code:java}
> exec "$FLINK_HOME"/bin/standalone-job.sh start-foreground "$@"
> {code}
> This is *ClusterRole* and *ClusterRoleBinding* that is granted to the 
> namespace.
> {code:java}
> kubectl create clusterrole cr --verb="*" --resource=configmaps
> kubectl create clusterrolebinding crb --clusterrole=cr 
> --serviceaccount=streaming-cluster-flink:default{code}
> Exception stacktrace happening in TaskManager
> {code:java}
> 2021-01-14 11:23:13,149 ERROR 
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Fatal error 
> occurred in TaskExecutor 
> akka.tcp://[email protected]:6565/user/rpc/taskmanager_0.
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
>  Could not register at the ResourceManager within the specified maximum 
> registration duration 300000 ms. This indicates a problem with this instance. 
> Terminating now.
>     at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1258)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1244)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.Actor.aroundReceive(Actor.scala:517) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.12-1.12.0.jar:1.12.0]
> 2021-01-14 11:23:13,158 ERROR 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error 
> occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
>  Could not register at the ResourceManager within the specified maximum 
> registration duration 300000 ms. This indicates a problem with this instance. 
> Terminating now.
>     at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1258)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1244)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.Actor.aroundReceive(Actor.scala:517) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.12-1.12.0.jar:1.12.0]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.12-1.12.0.jar:1.12.0]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to