Repository: flink
Updated Branches:
  refs/heads/master 254054fbb -> 1212b6d3f


[FLINK-3981] don't log duplicate TaskManager registrations

Duplicate TaskManager registrations shouldn't be logged with Exceptions
in the ResourceManager. Duplicate registrations can happen if the
TaskManager sends out registration messages too fast when the actual
reply is not lost but still in transit.

The ResourceManager should simply acknowledge the duplicate
registrations, leaving it up to the JobManager to decide how to treat
the duplicate registrations (currently it will send an AlreadyRegistered
to the TaskManager).

This closes #2045


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93280069
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93280069
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93280069

Branch: refs/heads/master
Commit: 93280069256a8afd3f3f5e263fcf2fa07ec14e0b
Parents: 254054f
Author: Maximilian Michels <[email protected]>
Authored: Fri May 27 21:02:38 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Mon May 30 12:29:05 2016 +0200

----------------------------------------------------------------------
 .../runtime/clusterframework/FlinkResourceManager.java      | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93280069/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 631f8d0..95a8f1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -354,18 +354,19 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
                ResourceID resourceID = msg.resourceId();
                try {
                        Preconditions.checkNotNull(resourceID);
-                       WorkerType newWorker = workerRegistered(resourceID);
-                       WorkerType oldWorker = 
registeredWorkers.put(resourceID, newWorker);
+                       // check if resourceID is already registered 
(TaskManager may send duplicate register messages)
+                       WorkerType oldWorker = 
registeredWorkers.get(resourceID);
                        if (oldWorker != null) {
-                               LOG.warn("TaskManager {} had been registered 
before.", resourceID);
+                               LOG.debug("TaskManager {} had been registered 
before.", resourceID);
                        } else {
+                               WorkerType newWorker = 
workerRegistered(resourceID);
+                               registeredWorkers.put(resourceID, newWorker);
                                LOG.info("TaskManager {} has registered.", 
resourceID);
                        }
                        jobManager.tell(decorateMessage(
                                new RegisterResourceSuccessful(taskManager, 
msg)),
                                self());
                } catch (Exception e) {
-                       // This may happen on duplicate task manager 
registration message to the job manager
                        LOG.warn("TaskManager resource registration failed for 
{}", resourceID, e);
 
                        // tell the JobManager about the failure

Reply via email to