[ 
https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16821683#comment-16821683
 ] 

Hwanju Kim edited comment on FLINK-11127 at 4/22/19 7:06 AM:
-------------------------------------------------------------

We experienced a similar issue to this, but more seriously with the 
communication between resource manager and task manager. In a normal situation, 
it works fine since only TMs actively connect to JM, whose name is resolvable 
(i.e., there's no outbound association from JM actor, only inbound). However, 
if a TM has a fatal error such as a task not responding to canceling request, 
it does graceful cleanup, a part of which is closing akka system sending a 
poison pill to JM, and then shutdown itself. Once this poison pill is gotten in 
JM, (as part of fail-over restart) its actor starts doing outbound association 
to destination TM host name that was provided during initial handshake. This 
outbound association here can't be succeeded if TM is not accessible via host 
name like in general Kubernetes setting. From this point on, TM can talk to JM 
for TM registration, but JM can't respond to this registration request, since 
outbound association can never be made. This failure of outbound association 
from JM's akka endpoint causes indefinite stuck in task scheduling due to the 
failure of TM registration with this error:
{code:java}
2019-02-28 21:58:15,867 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not 
resolve ResourceManager address 
akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager, retrying in 
10000 ms.
{code}
In response to constant failure like above, JM has slot allocation failure 
indefinitely as well:
{code:java}
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 300000 ms
{code}
We know there's multiple workarounds suggested here in this thread like 
stateful set, init container, and the passing JVM argument, but we did not want 
to add artifacts and complexity to deployment in production just to fix this 
issue (I tried the last taskmanager.host one as it's the least invasive to 
deployment, but it did not work for our case). Therefore, we went ahead adding 
"_taskmanager.rpc.use-host-address_" configuration in Flink and it's false by 
default, but if it's set to true, only in RPC setting, TM simply uses 
_taskManagerAddress.getHostAddress()_ instead of 
_taskManagerAddress.getHostName()_ (actual patch is a few lines as you could 
expect). It was minimal enough to us and it has been solving the problem so 
far. We decided to do this way because this could be a helpful option for an 
environment like the usual Kubernetes setting without TM stateful set or 
tweaks. -I am not sure if you guys are interested in this way, but sharing this 
for thought or interest.-

_*Since I wrote this, I found that FLINK-11632 had done what I described and 
it's been applied to 1.7 onward.*_ 

 


was (Author: hwanju):
We experienced a similar issue to this, but more seriously with the 
communication between resource manager and task manager. In a normal situation, 
it works fine since only TMs actively connect to JM, whose name is resolvable 
(i.e., there's no outbound association from JM actor, only inbound). However, 
if a TM has a fatal error such as a task not responding to canceling request, 
it does graceful cleanup, a part of which is closing akka system sending a 
poison pill to JM, and then shutdown itself. Once this poison pill is gotten in 
JM, (as part of fail-over restart) its actor starts doing outbound association 
to destination TM host name that was provided during initial handshake. This 
outbound association here can't be succeeded if TM is not accessible via host 
name like in general Kubernetes setting. From this point on, TM can talk to JM 
for TM registration, but JM can't respond to this registration request, since 
outbound association can never be made. This failure of outbound association 
from JM's akka endpoint causes indefinite stuck in task scheduling due to the 
failure of TM registration with this error:
{code:java}
2019-02-28 21:58:15,867 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not 
resolve ResourceManager address 
akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager, retrying in 
10000 ms.
{code}
In response to constant failure like above, JM has slot allocation failure 
indefinitely as well:
{code:java}
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 300000 ms
{code}
We know there's multiple workarounds suggested here in this thread like 
stateful set, init container, and the passing JVM argument, but we did not want 
to add artifacts and complexity to deployment in production just to fix this 
issue (I tried the last taskmanager.host one as it's the least invasive to 
deployment, but it did not work for our case). Therefore, we went ahead adding 
"_taskmanager.rpc.use-host-address_" configuration in Flink and it's false by 
default, but if it's set to true, only in RPC setting, TM simply uses 
_taskManagerAddress.getHostAddress()_ instead of 
_taskManagerAddress.getHostName()_ (actual patch is a few lines as you could 
expect). It was minimal enough to us and it has been solving the problem so 
far. We decided to do this way because this could be a helpful option for an 
environment like the usual Kubernetes setting without TM stateful set or 
tweaks. I am not sure if you guys are interested in this way, but sharing this 
for thought or interest.

 

> Make metrics query service establish connection to JobManager
> -------------------------------------------------------------
>
>                 Key: FLINK-11127
>                 URL: https://issues.apache.org/jira/browse/FLINK-11127
>             Project: Flink
>          Issue Type: Improvement
>          Components: Deployment / Kubernetes, Runtime / Coordination, Runtime 
> / Metrics
>    Affects Versions: 1.7.0
>            Reporter: Ufuk Celebi
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> As part of FLINK-10247, the internal metrics query service has been separated 
> into its own actor system. Before this change, the JobManager (JM) queried 
> TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a 
> separate connection to the TM metrics query service actor.
> In the context of Kubernetes, this is problematic as the JM will typically 
> *not* be able to resolve the TMs by name, resulting in warnings as follows:
> {code}
> 2018-12-11 08:32:33,962 WARN  akka.remote.ReliableDeliverySupervisor          
>               - Association with remote system 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has 
> failed, address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused 
> by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve]
> {code}
> In order to expose the TMs by name in Kubernetes, users require a service 
> *for each* TM instance which is not practical.
> This currently results in the web UI not being to display some basic metrics 
> about number of sent records. You can reproduce this by following the READMEs 
> in {{flink-container/kubernetes}}.
> This worked before, because the JM is typically exposed via a service with a 
> known name and the TMs establish the connection to it which the metrics query 
> service piggybacked on.
> A potential solution to this might be to let the query service connect to the 
> JM similar to how the TMs register.
> I tagged this ticket as an improvement, but in the context of Kubernetes I 
> would consider this to be a bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to