tillrohrmann commented on a change in pull request #12620:
URL: https://github.com/apache/flink/pull/12620#discussion_r439463764
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
##########
@@ -104,13 +104,15 @@
/**
* Sends the given {@link SlotReport} to the ResourceManager.
+ * This method is only used for sending the initial slot report after a
TaskManager registered to the ResourceManager.
+ * After that, the heartbeat payload is used for periodically updating
the slot reports.
*
* @param taskManagerRegistrationId id identifying the sending
TaskManager
* @param slotReport which is sent to the ResourceManager
* @param timeout for the operation
* @return Future which is completed with {@link Acknowledge} once the
slot report has been received.
*/
- CompletableFuture<Acknowledge> sendSlotReport(
+ CompletableFuture<Acknowledge> sendInitialSlotReport(
Review comment:
I am a bit unsure about this change since we are naming this method now
how it is used by the `TaskExecutor`. However, it speaks nothing against
calling `sendSlotReport` multiple times after the `TaskExecutor` has registered
at the `RM`. It's just that the `TaskExecutor` has been implemented in the way
that it calls this method only once. Hence, I would suggest to revert this
change.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
##########
@@ -272,6 +273,43 @@ public void testTaskManagerPodTerminated() throws
Exception {
}};
}
+ @Test
+ public void testTaskManagerPodTerminatedAfterRegistered() throws
Exception {
Review comment:
```suggestion
public void testTaskManagerPodTerminatedAfterRegistration() throws
Exception {
```
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
##########
@@ -390,31 +429,7 @@ public void testPreviousAttemptPodAdded() throws Exception
{
// adding current attempt pod should decrease
the pending worker count
resourceManager.onAdded(Collections.singletonList(new
KubernetesPod(currentAttemptPod)));
-
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(0));
- });
- }};
- }
-
- @Test
- public void testDuplicatedPodAdded() throws Exception {
Review comment:
Why can we remove this test?
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
##########
@@ -272,6 +273,43 @@ public void testTaskManagerPodTerminated() throws
Exception {
}};
}
+ @Test
+ public void testTaskManagerPodTerminatedAfterRegistered() throws
Exception {
+ new Context() {{
+ runTest(() -> {
+ registerSlotRequest();
+ final Pod pod1 =
kubeClient.pods().list().getItems().get(0);
+
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod1)));
+ registerTaskExecutor(new
ResourceID(pod1.getMetadata().getName()));
+
+ // Terminate the pod. Should not request a new
pod.
+ terminatePod(pod1);
+
resourceManager.onModified(Collections.singletonList(new KubernetesPod(pod1)));
+ assertEquals(0,
kubeClient.pods().list().getItems().size());
+
+ registerSlotRequest();
+ final Pod pod2 =
kubeClient.pods().list().getItems().get(0);
+
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod2)));
+ registerTaskExecutor(new
ResourceID(pod2.getMetadata().getName()));
+
+ // Error happens in the pod. Should not request
a new pod.
+ terminatePod(pod2);
+
resourceManager.onError(Collections.singletonList(new KubernetesPod(pod2)));
+ assertEquals(0,
kubeClient.pods().list().getItems().size());
+
+ registerSlotRequest();
+ final Pod pod3 =
kubeClient.pods().list().getItems().get(0);
+
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod3)));
+ registerTaskExecutor(new
ResourceID(pod3.getMetadata().getName()));
+
+ // Delete the pod. Should not request a new pod.
+ terminatePod(pod3);
+
resourceManager.onDeleted(Collections.singletonList(new KubernetesPod(pod3)));
+ assertEquals(0,
kubeClient.pods().list().getItems().size());
Review comment:
I think it would be better to split this test case into three separate
ones.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java
##########
@@ -110,39 +118,118 @@ public ActiveResourceManager(
protected abstract Configuration loadClientConfiguration();
- protected int getNumPendingWorkers() {
- return pendingWorkerCounter.getTotalNum();
+ @Override
+ protected void registerTaskManagerAtSlotManager(
+ WorkerRegistration<WorkerType>
workerTypeWorkerRegistration, SlotReport slotReport) {
+
super.registerTaskManagerAtSlotManager(workerTypeWorkerRegistration,
slotReport);
+
notifyAllocatedWorkerRegistered(workerTypeWorkerRegistration.getResourceID());
+ }
+
+ protected int getNumRequestedNotAllocatedWorkers() {
+ return requestedNotAllocatedWorkerCounter.getTotalNum();
+ }
+
+ protected int getNumRequestedNotAllocatedWorkersFor(WorkerResourceSpec
workerResourceSpec) {
+ return
requestedNotAllocatedWorkerCounter.getNum(workerResourceSpec);
+ }
+
+ protected int getNumRequestedNotRegisteredWorkers() {
+ return requestedNotRegisteredWorkerCounter.getTotalNum();
}
- protected int getNumPendingWorkersFor(WorkerResourceSpec
workerResourceSpec) {
- return pendingWorkerCounter.getNum(workerResourceSpec);
+ protected int getNumRequestedNotRegisteredWorkersFor(WorkerResourceSpec
workerResourceSpec) {
+ return
requestedNotRegisteredWorkerCounter.getNum(workerResourceSpec);
}
/**
* Notify that a worker with the given resource spec has been requested.
* @param workerResourceSpec resource spec of the requested worker
* @return updated number of pending workers for the given resource spec
*/
- protected int notifyNewWorkerRequested(WorkerResourceSpec
workerResourceSpec) {
- return pendingWorkerCounter.increaseAndGet(workerResourceSpec);
+ protected PendingWorkerNums notifyNewWorkerRequested(WorkerResourceSpec
workerResourceSpec) {
+ return new PendingWorkerNums(
+
requestedNotAllocatedWorkerCounter.increaseAndGet(workerResourceSpec),
+
requestedNotRegisteredWorkerCounter.increaseAndGet(workerResourceSpec));
}
/**
* Notify that a worker with the given resource spec has been allocated.
* @param workerResourceSpec resource spec of the requested worker
+ * @param resourceID id of the allocated resource
* @return updated number of pending workers for the given resource spec
*/
- protected int notifyNewWorkerAllocated(WorkerResourceSpec
workerResourceSpec) {
- return pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+ protected PendingWorkerNums notifyNewWorkerAllocated(WorkerResourceSpec
workerResourceSpec, ResourceID resourceID) {
+ allocatedNotRegisteredWorkerResourceSpecs.put(resourceID,
workerResourceSpec);
+ return new PendingWorkerNums(
+
requestedNotAllocatedWorkerCounter.decreaseAndGet(workerResourceSpec),
+
requestedNotRegisteredWorkerCounter.getNum(workerResourceSpec));
}
/**
* Notify that allocation of a worker with the given resource spec has
failed.
* @param workerResourceSpec resource spec of the requested worker
* @return updated number of pending workers for the given resource spec
*/
- protected int notifyNewWorkerAllocationFailed(WorkerResourceSpec
workerResourceSpec) {
- return pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+ protected PendingWorkerNums
notifyNewWorkerAllocationFailed(WorkerResourceSpec workerResourceSpec) {
+ return new PendingWorkerNums(
+
requestedNotAllocatedWorkerCounter.decreaseAndGet(workerResourceSpec),
+
requestedNotRegisteredWorkerCounter.decreaseAndGet(workerResourceSpec));
+ }
+
+ /**
+ * Notify that a worker with the given resource spec has been
registered.
+ * @param resourceID id of the registered worker resource
+ * @return updated number of pending workers for the corresponding
resource spec
+ */
+ private PendingWorkerNums notifyAllocatedWorkerRegistered(ResourceID
resourceID) {
Review comment:
I think this methods does not need to return something. Then we could
get rid of `PendingWorkerNums.ZERO` which does not seem to be a correct return
value.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -404,13 +404,17 @@ private void stopResourceManagerServices() throws
Exception {
final WorkerRegistration<WorkerType>
workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);
if
(workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId))
{
-
slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport);
+
registerTaskManagerAtSlotManager(workerTypeWorkerRegistration, slotReport);
Review comment:
I think it would be slightly nicer to do it the following way:
```suggestion
if
(slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
onTaskManagerRegistration(workerTypeWorkerRegistration);
}
```
That way, sub classes won't be able to obstruct the registration of TMs at
the `SlotManager` but can still listen to registration events. Of course, this
change would required that `SlotManager.registerTaskManager` returns `true` if
it is a new registration and `false` if it is not a new registration.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java
##########
@@ -110,39 +118,118 @@ public ActiveResourceManager(
protected abstract Configuration loadClientConfiguration();
- protected int getNumPendingWorkers() {
- return pendingWorkerCounter.getTotalNum();
+ @Override
+ protected void registerTaskManagerAtSlotManager(
+ WorkerRegistration<WorkerType>
workerTypeWorkerRegistration, SlotReport slotReport) {
+
super.registerTaskManagerAtSlotManager(workerTypeWorkerRegistration,
slotReport);
+
notifyAllocatedWorkerRegistered(workerTypeWorkerRegistration.getResourceID());
+ }
+
+ protected int getNumRequestedNotAllocatedWorkers() {
+ return requestedNotAllocatedWorkerCounter.getTotalNum();
+ }
+
+ protected int getNumRequestedNotAllocatedWorkersFor(WorkerResourceSpec
workerResourceSpec) {
+ return
requestedNotAllocatedWorkerCounter.getNum(workerResourceSpec);
+ }
+
+ protected int getNumRequestedNotRegisteredWorkers() {
+ return requestedNotRegisteredWorkerCounter.getTotalNum();
}
- protected int getNumPendingWorkersFor(WorkerResourceSpec
workerResourceSpec) {
- return pendingWorkerCounter.getNum(workerResourceSpec);
+ protected int getNumRequestedNotRegisteredWorkersFor(WorkerResourceSpec
workerResourceSpec) {
+ return
requestedNotRegisteredWorkerCounter.getNum(workerResourceSpec);
}
/**
* Notify that a worker with the given resource spec has been requested.
* @param workerResourceSpec resource spec of the requested worker
* @return updated number of pending workers for the given resource spec
*/
- protected int notifyNewWorkerRequested(WorkerResourceSpec
workerResourceSpec) {
- return pendingWorkerCounter.increaseAndGet(workerResourceSpec);
+ protected PendingWorkerNums notifyNewWorkerRequested(WorkerResourceSpec
workerResourceSpec) {
+ return new PendingWorkerNums(
+
requestedNotAllocatedWorkerCounter.increaseAndGet(workerResourceSpec),
+
requestedNotRegisteredWorkerCounter.increaseAndGet(workerResourceSpec));
}
/**
* Notify that a worker with the given resource spec has been allocated.
* @param workerResourceSpec resource spec of the requested worker
+ * @param resourceID id of the allocated resource
* @return updated number of pending workers for the given resource spec
*/
- protected int notifyNewWorkerAllocated(WorkerResourceSpec
workerResourceSpec) {
- return pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+ protected PendingWorkerNums notifyNewWorkerAllocated(WorkerResourceSpec
workerResourceSpec, ResourceID resourceID) {
+ allocatedNotRegisteredWorkerResourceSpecs.put(resourceID,
workerResourceSpec);
+ return new PendingWorkerNums(
+
requestedNotAllocatedWorkerCounter.decreaseAndGet(workerResourceSpec),
+
requestedNotRegisteredWorkerCounter.getNum(workerResourceSpec));
}
/**
* Notify that allocation of a worker with the given resource spec has
failed.
* @param workerResourceSpec resource spec of the requested worker
* @return updated number of pending workers for the given resource spec
*/
- protected int notifyNewWorkerAllocationFailed(WorkerResourceSpec
workerResourceSpec) {
- return pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+ protected PendingWorkerNums
notifyNewWorkerAllocationFailed(WorkerResourceSpec workerResourceSpec) {
+ return new PendingWorkerNums(
+
requestedNotAllocatedWorkerCounter.decreaseAndGet(workerResourceSpec),
+
requestedNotRegisteredWorkerCounter.decreaseAndGet(workerResourceSpec));
+ }
+
+ /**
+ * Notify that a worker with the given resource spec has been
registered.
+ * @param resourceID id of the registered worker resource
+ * @return updated number of pending workers for the corresponding
resource spec
+ */
+ private PendingWorkerNums notifyAllocatedWorkerRegistered(ResourceID
resourceID) {
+ WorkerResourceSpec workerResourceSpec =
allocatedNotRegisteredWorkerResourceSpecs.remove(resourceID);
+ if (workerResourceSpec == null) {
+ // ignore workers from previous attempt
+ return PendingWorkerNums.ZERO;
+ }
+ return new PendingWorkerNums(
+
requestedNotAllocatedWorkerCounter.getNum(workerResourceSpec),
+
requestedNotRegisteredWorkerCounter.decreaseAndGet(workerResourceSpec));
+ }
+
+ /**
+ * Notify that a worker with the given resource spec has been stopped.
+ * @param resourceID id of the stopped worker resource
+ * @return updated number of pending workers for the corresponding
resource spec
+ */
+ protected PendingWorkerNums notifyAllocatedWorkerStopped(ResourceID
resourceID) {
Review comment:
Same here. This method does not need to return `PendingWorkerNums`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]