Github user Clarkkkkk commented on a diff in the pull request:
https://github.com/apache/flink/pull/6192#discussion_r199036840
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -334,8 +335,11 @@ public void onContainersCompleted(final
List<ContainerStatus> list) {
if (yarnWorkerNode != null) {
// Container completed
unexpectedly ~> start a new one
final Container container =
yarnWorkerNode.getContainer();
-
requestYarnContainer(container.getResource(),
yarnWorkerNode.getContainer().getPriority());
-
closeTaskManagerConnection(resourceId, new
Exception(containerStatus.getDiagnostics()));
+ // check WorkerRegistration
status to avoid requesting containers more than required
+ if
(checkWorkerRegistrationWithResourceId(resourceId)) {
--- End diff --
Yes, I might happen. The problem is not as easy as I thought. The actual
cause of this problem is the resource was released before a full restart but
the onContainerCompleted callback method happened after the full restart. As
the full restart will requesting all the containers needed as configured, if
the onContainerCompleted method was called after that, it will request for a
new container and possess it which is not needed.
---