[
https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830167#comment-16830167
]
Till Rohrmann commented on FLINK-12260:
---------------------------------------
I'm still a bit skeptical that there is a race condition on the sender side. We
only trigger the second registration call after the first one has produced a
timeout. A timeout can only be produced if we have sent a request to the
receiver side.
My proposal would be the following:
We have a {{Map<ResourceID, CompletableFuture<TaskExecutorGateway>>
taskExecutorGatewayFutures}} of pending {{TaskExecutor}} connections. When we
enter the method {{ResourceManager#registerTaskExecutor}} we create a new
future and update {{taskExecutorGatewayFutures}}.
Inside the {{handleAsync}} call on the future we check whether we are still the
pending {{TaskExecutor}} connection:
{code}
return taskExecutorGatewayFuture.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
if (taskExecutorGatewayFutures.get(taskExecutorResourceId) ==
taskExecutorGatewayFuture) {
taskExecutorGatewayFutures.remove(taskExecutorResourceId);
if (throwable != null) {
return new
RegistrationResponse.Decline(throwable.getMessage());
} else {
return registerTaskExecutorInternal(
taskExecutorGateway,
taskExecutorAddress,
taskExecutorResourceId,
dataPort,
hardwareDescription);
}
} else {
log.debug("Ignoring outdated TaskExecutorGateway
connection.");
}
},
getMainThreadExecutor());
{code}
Given that the {{registerTaskExecutor}} calls come in order, this should
prevent that an earlier register call overrides a later one.
> Slot allocation failure by taskmanager registration timeout and race
> --------------------------------------------------------------------
>
> Key: FLINK-12260
> URL: https://issues.apache.org/jira/browse/FLINK-12260
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.6.3
> Reporter: Hwanju Kim
> Priority: Critical
> Attachments: FLINK-12260-repro.diff
>
>
>
> In 1.6.2., we have seen slot allocation failure keep happening for long time.
> Having looked at the log, I see the following behavior:
> # TM sends a registration request R1 to resource manager.
> # R1 times out after 100ms, which is initial timeout.
> # TM retries a registration request R2 to resource manager (with timeout
> 200ms).
> # R2 arrives first at resource manager and registered, and then TM gets
> successful response moving onto step 5 below.
> # On successful registration, R2's instance is put to
> taskManagerRegistrations
> # Then R1 arrives at resource manager and realizes the same TM resource ID
> is already registered, which then unregisters R2's instance ID from
> taskManagerRegistrations. A new instance ID for R1 is registered to
> workerRegistration.
> # R1's response is not handled though since it already timed out (see akka
> temp actor resolve failure below), hence no registration to
> taskManagerRegistrations.
> # TM keeps heartbeating to the resource manager with slot status.
> # Resource manager ignores this slot status, since taskManagerRegistrations
> contains R2, not R1, which replaced R2 in workerRegistration at step 6.
> # Slot request can never be fulfilled, timing out.
> The following is the debug logs for the above steps:
>
> {code:java}
> JM log:
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID
> 46c8e0d0fcf2c306f11954a1040d5677
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at
> ResourceManager
> 2019-04-11 22:39:40.000,Registering TaskManager
> 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at
> the SlotManager.
> 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor
> 46c8e0d0fcf2c306f11954a1040d5677.
> 2019-04-11 22:39:40.000,Unregister TaskManager
> deade132e2c41c52019cdc27977266cf from the SlotManager.
> 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID
> 46c8e0d0fcf2c306f11954a1040d5677
> (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at
> ResourceManager
> TM log:
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1
> (timeout=100ms)
> 2019-04-11 22:39:40.000,Registration at ResourceManager
> (akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager) attempt 1
> timed out after 100 ms
> 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 2
> (timeout=200ms)
> 2019-04-11 22:39:40.000,Successful registration at resource manager
> akka.ssl.tcp://flink@flink-jobmanager:6123/user/resourcemanager under
> registration id deade132e2c41c52019cdc27977266cf.
> 2019-04-11 22:39:41.000,resolve of path sequence [/temp/$c] failed{code}
>
> As RPC calls seem to use akka ask, which creates temporary source actor, I
> think multiple RPC calls could've arrived out or order by different actor
> pairs and the symptom above seems to be due to that. If so, it could have
> attempt account in the call argument to prevent unexpected unregistration? At
> this point, what I have done is only log analysis, so I could do further
> analysis, but before that wanted to check if it's a known issue. I also
> searched with some relevant terms and log pieces, but couldn't find the
> duplicate. Please deduplicate if any.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)