Repository: flink Updated Branches: refs/heads/master 254054fbb -> 1212b6d3f
[FLINK-3981] don't log duplicate TaskManager registrations Duplicate TaskManager registrations shouldn't be logged with Exceptions in the ResourceManager. Duplicate registrations can happen if the TaskManager sends out registration messages too fast when the actual reply is not lost but still in transit. The ResourceManager should simply acknowledge the duplicate registrations, leaving it up to the JobManager to decide how to treat the duplicate registrations (currently it will send an AlreadyRegistered to the TaskManager). This closes #2045 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93280069 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93280069 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93280069 Branch: refs/heads/master Commit: 93280069256a8afd3f3f5e263fcf2fa07ec14e0b Parents: 254054f Author: Maximilian Michels <[email protected]> Authored: Fri May 27 21:02:38 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon May 30 12:29:05 2016 +0200 ---------------------------------------------------------------------- .../runtime/clusterframework/FlinkResourceManager.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/93280069/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index 631f8d0..95a8f1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -354,18 +354,19 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva ResourceID resourceID = msg.resourceId(); try { Preconditions.checkNotNull(resourceID); - WorkerType newWorker = workerRegistered(resourceID); - WorkerType oldWorker = registeredWorkers.put(resourceID, newWorker); + // check if resourceID is already registered (TaskManager may send duplicate register messages) + WorkerType oldWorker = registeredWorkers.get(resourceID); if (oldWorker != null) { - LOG.warn("TaskManager {} had been registered before.", resourceID); + LOG.debug("TaskManager {} had been registered before.", resourceID); } else { + WorkerType newWorker = workerRegistered(resourceID); + registeredWorkers.put(resourceID, newWorker); LOG.info("TaskManager {} has registered.", resourceID); } jobManager.tell(decorateMessage( new RegisterResourceSuccessful(taskManager, msg)), self()); } catch (Exception e) { - // This may happen on duplicate task manager registration message to the job manager LOG.warn("TaskManager resource registration failed for {}", resourceID, e); // tell the JobManager about the failure
