[ 
https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830167#comment-16830167
 ] 

Till Rohrmann commented on FLINK-12260:
---------------------------------------

I'm still a bit skeptical that there is a race condition on the sender side. We 
only trigger the second registration call after the first one has produced a 
timeout. A timeout can only be produced if we have sent a request to the 
receiver side.

My proposal would be the following:

We have a {{Map<ResourceID, CompletableFuture<TaskExecutorGateway>> 
taskExecutorGatewayFutures}} of pending {{TaskExecutor}} connections. When we 
enter the method {{ResourceManager#registerTaskExecutor}} we create a new 
future and update {{taskExecutorGatewayFutures}}.

Inside the {{handleAsync}} call on the future we check whether we are still the 
pending {{TaskExecutor}} connection:

{code}
return taskExecutorGatewayFuture.handleAsync(
        (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
                if (taskExecutorGatewayFutures.get(taskExecutorResourceId) == 
taskExecutorGatewayFuture) {
                        
taskExecutorGatewayFutures.remove(taskExecutorResourceId);
                        if (throwable != null) {
                                return new 
RegistrationResponse.Decline(throwable.getMessage());
                        } else {
                                return registerTaskExecutorInternal(
                                        taskExecutorGateway,
                                        taskExecutorAddress,
                                        taskExecutorResourceId,
                                        dataPort,
                                        hardwareDescription);
                        }
                } else {
                        log.debug("Ignoring outdated TaskExecutorGateway 
connection.");
                }
        },
        getMainThreadExecutor());
{code}

Given that the {{registerTaskExecutor}} calls come in order, this should 
prevent that an earlier register call overrides a later one.

> Slot allocation failure by taskmanager registration timeout and race
> --------------------------------------------------------------------
>
>                 Key: FLINK-12260
>                 URL: https://issues.apache.org/jira/browse/FLINK-12260
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.6.3
>            Reporter: Hwanju Kim
>            Priority: Critical
>         Attachments: FLINK-12260-repro.diff
>
>
>  
> In 1.6.2., we have seen slot allocation failure keep happening for long time. 
> Having looked at the log, I see the following behavior:
>  # TM sends a registration request R1 to resource manager.
>  # R1 times out after 100ms, which is initial timeout.
>  # TM retries a registration request R2 to resource manager (with timeout 
> 200ms).
>  # R2 arrives first at resource manager and registered, and then TM gets 
> successful response moving onto step 5 below.
>  # On successful registration, R2's instance is put to 
> taskManagerRegistrations
>  # Then R1 arrives at resource manager and realizes the same TM resource ID 
> is already registered, which then unregisters R2's instance ID from 
> taskManagerRegistrations. A new instance ID for R1 is registered to 
> workerRegistration.
>  # R1's response is not handled though since it already timed out (see akka 
> temp actor resolve failure below), hence no registration to 
> taskManagerRegistrations.
>  # TM keeps heartbeating to the resource manager with slot status.
>  # Resource manager ignores this slot status, since taskManagerRegistrations 
> contains R2, not R1, which replaced R2 in workerRegistration at step 6.
>  # Slot request can never be fulfilled, timing out.
> The following is the debug logs for the above steps:
>  
> {code:java}
> JM log:
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> 2019-04-11 22:39:40.000,Registering TaskManager 
> 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at 
> the SlotManager.
> 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor 
> 46c8e0d0fcf2c306f11954a1040d5677.
> 2019-04-11 22:39:40.000,Unregister TaskManager 
> deade132e2c41c52019cdc27977266cf from the SlotManager.
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID 
> 46c8e0d0fcf2c306f11954a1040d5677 
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at 
> ResourceManager
> TM log:
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 
> (timeout=100ms)
> 2019-04-11 22:39:40.000,Registration at ResourceManager 
> (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1 
> timed out after 100 ms
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2 
> (timeout=200ms)
> 2019-04-11 22:39:40.000,Successful registration at resource manager 
> akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under 
> registration id deade132e2c41c52019cdc27977266cf.
> 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code}
>  
> As RPC calls seem to use akka ask, which creates temporary source actor, I 
> think multiple RPC calls could've arrived out or order by different actor 
> pairs and the symptom above seems to be due to that. If so, it could have 
> attempt account in the call argument to prevent unexpected unregistration? At 
> this point, what I have done is only log analysis, so I could do further 
> analysis, but before that wanted to check if it's a known issue. I also 
> searched with some relevant terms and log pieces, but couldn't find the 
> duplicate. Please deduplicate if any.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to