tillrohrmann commented on a change in pull request #12620:
URL: https://github.com/apache/flink/pull/12620#discussion_r439463764



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
##########
@@ -104,13 +104,15 @@
 
        /**
         * Sends the given {@link SlotReport} to the ResourceManager.
+        * This method is only used for sending the initial slot report after a 
TaskManager registered to the ResourceManager.
+        * After that, the heartbeat payload is used for periodically updating 
the slot reports.
         *
         * @param taskManagerRegistrationId id identifying the sending 
TaskManager
         * @param slotReport which is sent to the ResourceManager
         * @param timeout for the operation
         * @return Future which is completed with {@link Acknowledge} once the 
slot report has been received.
         */
-       CompletableFuture<Acknowledge> sendSlotReport(
+       CompletableFuture<Acknowledge> sendInitialSlotReport(

Review comment:
       I am a bit unsure about this change since we are naming this method now 
how it is used by the `TaskExecutor`. However, it speaks nothing against 
calling `sendSlotReport` multiple times after the `TaskExecutor` has registered 
at the `RM`. It's just that the `TaskExecutor` has been implemented in the way 
that it calls this method only once. Hence, I would suggest to revert this 
change.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
##########
@@ -272,6 +273,43 @@ public void testTaskManagerPodTerminated() throws 
Exception {
                }};
        }
 
+       @Test
+       public void testTaskManagerPodTerminatedAfterRegistered() throws 
Exception {

Review comment:
       ```suggestion
        public void testTaskManagerPodTerminatedAfterRegistration() throws 
Exception {
   ```

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
##########
@@ -390,31 +429,7 @@ public void testPreviousAttemptPodAdded() throws Exception 
{
 
                                // adding current attempt pod should decrease 
the pending worker count
                                
resourceManager.onAdded(Collections.singletonList(new 
KubernetesPod(currentAttemptPod)));
-                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(0));
-                       });
-               }};
-       }
-
-       @Test
-       public void testDuplicatedPodAdded() throws Exception {

Review comment:
       Why can we remove this test?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
##########
@@ -272,6 +273,43 @@ public void testTaskManagerPodTerminated() throws 
Exception {
                }};
        }
 
+       @Test
+       public void testTaskManagerPodTerminatedAfterRegistered() throws 
Exception {
+               new Context() {{
+                       runTest(() -> {
+                               registerSlotRequest();
+                               final Pod pod1 = 
kubeClient.pods().list().getItems().get(0);
+                               
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod1)));
+                               registerTaskExecutor(new 
ResourceID(pod1.getMetadata().getName()));
+
+                               // Terminate the pod. Should not request a new 
pod.
+                               terminatePod(pod1);
+                               
resourceManager.onModified(Collections.singletonList(new KubernetesPod(pod1)));
+                               assertEquals(0, 
kubeClient.pods().list().getItems().size());
+
+                               registerSlotRequest();
+                               final Pod pod2 = 
kubeClient.pods().list().getItems().get(0);
+                               
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod2)));
+                               registerTaskExecutor(new 
ResourceID(pod2.getMetadata().getName()));
+
+                               // Error happens in the pod. Should not request 
a new pod.
+                               terminatePod(pod2);
+                               
resourceManager.onError(Collections.singletonList(new KubernetesPod(pod2)));
+                               assertEquals(0, 
kubeClient.pods().list().getItems().size());
+
+                               registerSlotRequest();
+                               final Pod pod3 = 
kubeClient.pods().list().getItems().get(0);
+                               
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod3)));
+                               registerTaskExecutor(new 
ResourceID(pod3.getMetadata().getName()));
+
+                               // Delete the pod. Should not request a new pod.
+                               terminatePod(pod3);
+                               
resourceManager.onDeleted(Collections.singletonList(new KubernetesPod(pod3)));
+                               assertEquals(0, 
kubeClient.pods().list().getItems().size());

Review comment:
       I think it would be better to split this test case into three separate 
ones.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java
##########
@@ -110,39 +118,118 @@ public ActiveResourceManager(
 
        protected abstract Configuration loadClientConfiguration();
 
-       protected int getNumPendingWorkers() {
-               return pendingWorkerCounter.getTotalNum();
+       @Override
+       protected void registerTaskManagerAtSlotManager(
+                       WorkerRegistration<WorkerType> 
workerTypeWorkerRegistration, SlotReport slotReport) {
+               
super.registerTaskManagerAtSlotManager(workerTypeWorkerRegistration, 
slotReport);
+               
notifyAllocatedWorkerRegistered(workerTypeWorkerRegistration.getResourceID());
+       }
+
+       protected int getNumRequestedNotAllocatedWorkers() {
+               return requestedNotAllocatedWorkerCounter.getTotalNum();
+       }
+
+       protected int getNumRequestedNotAllocatedWorkersFor(WorkerResourceSpec 
workerResourceSpec) {
+               return 
requestedNotAllocatedWorkerCounter.getNum(workerResourceSpec);
+       }
+
+       protected int getNumRequestedNotRegisteredWorkers() {
+               return requestedNotRegisteredWorkerCounter.getTotalNum();
        }
 
-       protected int getNumPendingWorkersFor(WorkerResourceSpec 
workerResourceSpec) {
-               return pendingWorkerCounter.getNum(workerResourceSpec);
+       protected int getNumRequestedNotRegisteredWorkersFor(WorkerResourceSpec 
workerResourceSpec) {
+               return 
requestedNotRegisteredWorkerCounter.getNum(workerResourceSpec);
        }
 
        /**
         * Notify that a worker with the given resource spec has been requested.
         * @param workerResourceSpec resource spec of the requested worker
         * @return updated number of pending workers for the given resource spec
         */
-       protected int notifyNewWorkerRequested(WorkerResourceSpec 
workerResourceSpec) {
-               return pendingWorkerCounter.increaseAndGet(workerResourceSpec);
+       protected PendingWorkerNums notifyNewWorkerRequested(WorkerResourceSpec 
workerResourceSpec) {
+               return new PendingWorkerNums(
+                       
requestedNotAllocatedWorkerCounter.increaseAndGet(workerResourceSpec),
+                       
requestedNotRegisteredWorkerCounter.increaseAndGet(workerResourceSpec));
        }
 
        /**
         * Notify that a worker with the given resource spec has been allocated.
         * @param workerResourceSpec resource spec of the requested worker
+        * @param resourceID id of the allocated resource
         * @return updated number of pending workers for the given resource spec
         */
-       protected int notifyNewWorkerAllocated(WorkerResourceSpec 
workerResourceSpec) {
-               return pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+       protected PendingWorkerNums notifyNewWorkerAllocated(WorkerResourceSpec 
workerResourceSpec, ResourceID resourceID) {
+               allocatedNotRegisteredWorkerResourceSpecs.put(resourceID, 
workerResourceSpec);
+               return new PendingWorkerNums(
+                       
requestedNotAllocatedWorkerCounter.decreaseAndGet(workerResourceSpec),
+                       
requestedNotRegisteredWorkerCounter.getNum(workerResourceSpec));
        }
 
        /**
         * Notify that allocation of a worker with the given resource spec has 
failed.
         * @param workerResourceSpec resource spec of the requested worker
         * @return updated number of pending workers for the given resource spec
         */
-       protected int notifyNewWorkerAllocationFailed(WorkerResourceSpec 
workerResourceSpec) {
-               return pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+       protected PendingWorkerNums 
notifyNewWorkerAllocationFailed(WorkerResourceSpec workerResourceSpec) {
+               return new PendingWorkerNums(
+                       
requestedNotAllocatedWorkerCounter.decreaseAndGet(workerResourceSpec),
+                       
requestedNotRegisteredWorkerCounter.decreaseAndGet(workerResourceSpec));
+       }
+
+       /**
+        * Notify that a worker with the given resource spec has been 
registered.
+        * @param resourceID id of the registered worker resource
+        * @return updated number of pending workers for the corresponding 
resource spec
+        */
+       private PendingWorkerNums notifyAllocatedWorkerRegistered(ResourceID 
resourceID) {

Review comment:
       I think this methods does not need to return something. Then we could 
get rid of `PendingWorkerNums.ZERO` which does not seem to be a correct return 
value.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -404,13 +404,17 @@ private void stopResourceManagerServices() throws 
Exception {
                final WorkerRegistration<WorkerType> 
workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);
 
                if 
(workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId))
 {
-                       
slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport);
+                       
registerTaskManagerAtSlotManager(workerTypeWorkerRegistration, slotReport);

Review comment:
       I think it would be slightly nicer to do it the following way:
   ```suggestion
                           if 
(slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
                            
onTaskManagerRegistration(workerTypeWorkerRegistration);
                        }
   ```
   
   That way, sub classes won't be able to obstruct the registration of TMs at 
the `SlotManager` but can still listen to registration events. Of course, this 
change would required that `SlotManager.registerTaskManager` returns `true` if 
it is a new registration and `false` if it is not a new registration.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java
##########
@@ -110,39 +118,118 @@ public ActiveResourceManager(
 
        protected abstract Configuration loadClientConfiguration();
 
-       protected int getNumPendingWorkers() {
-               return pendingWorkerCounter.getTotalNum();
+       @Override
+       protected void registerTaskManagerAtSlotManager(
+                       WorkerRegistration<WorkerType> 
workerTypeWorkerRegistration, SlotReport slotReport) {
+               
super.registerTaskManagerAtSlotManager(workerTypeWorkerRegistration, 
slotReport);
+               
notifyAllocatedWorkerRegistered(workerTypeWorkerRegistration.getResourceID());
+       }
+
+       protected int getNumRequestedNotAllocatedWorkers() {
+               return requestedNotAllocatedWorkerCounter.getTotalNum();
+       }
+
+       protected int getNumRequestedNotAllocatedWorkersFor(WorkerResourceSpec 
workerResourceSpec) {
+               return 
requestedNotAllocatedWorkerCounter.getNum(workerResourceSpec);
+       }
+
+       protected int getNumRequestedNotRegisteredWorkers() {
+               return requestedNotRegisteredWorkerCounter.getTotalNum();
        }
 
-       protected int getNumPendingWorkersFor(WorkerResourceSpec 
workerResourceSpec) {
-               return pendingWorkerCounter.getNum(workerResourceSpec);
+       protected int getNumRequestedNotRegisteredWorkersFor(WorkerResourceSpec 
workerResourceSpec) {
+               return 
requestedNotRegisteredWorkerCounter.getNum(workerResourceSpec);
        }
 
        /**
         * Notify that a worker with the given resource spec has been requested.
         * @param workerResourceSpec resource spec of the requested worker
         * @return updated number of pending workers for the given resource spec
         */
-       protected int notifyNewWorkerRequested(WorkerResourceSpec 
workerResourceSpec) {
-               return pendingWorkerCounter.increaseAndGet(workerResourceSpec);
+       protected PendingWorkerNums notifyNewWorkerRequested(WorkerResourceSpec 
workerResourceSpec) {
+               return new PendingWorkerNums(
+                       
requestedNotAllocatedWorkerCounter.increaseAndGet(workerResourceSpec),
+                       
requestedNotRegisteredWorkerCounter.increaseAndGet(workerResourceSpec));
        }
 
        /**
         * Notify that a worker with the given resource spec has been allocated.
         * @param workerResourceSpec resource spec of the requested worker
+        * @param resourceID id of the allocated resource
         * @return updated number of pending workers for the given resource spec
         */
-       protected int notifyNewWorkerAllocated(WorkerResourceSpec 
workerResourceSpec) {
-               return pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+       protected PendingWorkerNums notifyNewWorkerAllocated(WorkerResourceSpec 
workerResourceSpec, ResourceID resourceID) {
+               allocatedNotRegisteredWorkerResourceSpecs.put(resourceID, 
workerResourceSpec);
+               return new PendingWorkerNums(
+                       
requestedNotAllocatedWorkerCounter.decreaseAndGet(workerResourceSpec),
+                       
requestedNotRegisteredWorkerCounter.getNum(workerResourceSpec));
        }
 
        /**
         * Notify that allocation of a worker with the given resource spec has 
failed.
         * @param workerResourceSpec resource spec of the requested worker
         * @return updated number of pending workers for the given resource spec
         */
-       protected int notifyNewWorkerAllocationFailed(WorkerResourceSpec 
workerResourceSpec) {
-               return pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+       protected PendingWorkerNums 
notifyNewWorkerAllocationFailed(WorkerResourceSpec workerResourceSpec) {
+               return new PendingWorkerNums(
+                       
requestedNotAllocatedWorkerCounter.decreaseAndGet(workerResourceSpec),
+                       
requestedNotRegisteredWorkerCounter.decreaseAndGet(workerResourceSpec));
+       }
+
+       /**
+        * Notify that a worker with the given resource spec has been 
registered.
+        * @param resourceID id of the registered worker resource
+        * @return updated number of pending workers for the corresponding 
resource spec
+        */
+       private PendingWorkerNums notifyAllocatedWorkerRegistered(ResourceID 
resourceID) {
+               WorkerResourceSpec workerResourceSpec = 
allocatedNotRegisteredWorkerResourceSpecs.remove(resourceID);
+               if (workerResourceSpec == null) {
+                       // ignore workers from previous attempt
+                       return PendingWorkerNums.ZERO;
+               }
+               return new PendingWorkerNums(
+                       
requestedNotAllocatedWorkerCounter.getNum(workerResourceSpec),
+                       
requestedNotRegisteredWorkerCounter.decreaseAndGet(workerResourceSpec));
+       }
+
+       /**
+        * Notify that a worker with the given resource spec has been stopped.
+        * @param resourceID id of the stopped worker resource
+        * @return updated number of pending workers for the corresponding 
resource spec
+        */
+       protected PendingWorkerNums notifyAllocatedWorkerStopped(ResourceID 
resourceID) {

Review comment:
       Same here. This method does not need to return `PendingWorkerNums`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to