This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a24f7717847 [FLINK-34249][runtime] Remove DefaultSlotTracker related
logic.
a24f7717847 is described below
commit a24f7717847ce4e4c511070257e99d7c3f948d2a
Author: Roc Marshal <[email protected]>
AuthorDate: Wed Jan 24 20:25:55 2024 +0800
[FLINK-34249][runtime] Remove DefaultSlotTracker related logic.
---
.../slotmanager/DeclarativeTaskManagerSlot.java | 146 ---------
.../slotmanager/DefaultSlotTracker.java | 337 ---------------------
.../resourcemanager/slotmanager/SlotTracker.java | 112 -------
.../slotmanager/DefaultSlotTrackerTest.java | 332 --------------------
.../slotmanager/SlotStatusReconcilerTest.java | 243 ---------------
5 files changed, 1170 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeTaskManagerSlot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeTaskManagerSlot.java
deleted file mode 100644
index 6f730d6c780..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeTaskManagerSlot.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-/**
- * A DeclarativeTaskManagerSlot represents a slot located in a TaskExecutor.
It contains the
- * necessary information for initiating the allocation of the slot, and keeps
track of the state of
- * the slot.
- */
-class DeclarativeTaskManagerSlot implements TaskManagerSlotInformation {
-
- /** The unique identification of this slot. */
- private final SlotID slotId;
-
- /** The resource profile of this slot. */
- private final ResourceProfile resourceProfile;
-
- /** Gateway to the TaskExecutor which owns the slot. */
- private final TaskExecutorConnection taskManagerConnection;
-
- /** Job id for which this slot has been allocated. */
- @Nullable private JobID jobId;
-
- private SlotState state = SlotState.FREE;
-
- private long allocationStartTimeStamp;
-
- public DeclarativeTaskManagerSlot(
- SlotID slotId,
- ResourceProfile resourceProfile,
- TaskExecutorConnection taskManagerConnection) {
- this.slotId = slotId;
- this.resourceProfile = resourceProfile;
- this.taskManagerConnection = taskManagerConnection;
- }
-
- @Override
- public SlotState getState() {
- return state;
- }
-
- @Override
- public SlotID getSlotId() {
- return slotId;
- }
-
- @Override
- public AllocationID getAllocationId() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ResourceProfile getResourceProfile() {
- return resourceProfile;
- }
-
- @Override
- public TaskExecutorConnection getTaskManagerConnection() {
- return taskManagerConnection;
- }
-
- @Nullable
- @Override
- public JobID getJobId() {
- return jobId;
- }
-
- @Override
- public InstanceID getInstanceId() {
- return taskManagerConnection.getInstanceID();
- }
-
- public long getAllocationStartTimestamp() {
- return allocationStartTimeStamp;
- }
-
- public void startAllocation(JobID jobId) {
- Preconditions.checkState(
- state == SlotState.FREE, "Slot must be free to be assigned a
slot request.");
-
- this.jobId = jobId;
- this.state = SlotState.PENDING;
- this.allocationStartTimeStamp = System.currentTimeMillis();
- }
-
- public void completeAllocation() {
- Preconditions.checkState(
- state == SlotState.PENDING,
- "In order to complete an allocation, the slot has to be
allocated.");
-
- this.state = SlotState.ALLOCATED;
- }
-
- public void freeSlot() {
- Preconditions.checkState(
- state == SlotState.PENDING || state == SlotState.ALLOCATED,
- "Slot must be allocated or pending before freeing it.");
-
- this.jobId = null;
- this.state = SlotState.FREE;
- this.allocationStartTimeStamp = 0;
- }
-
- @Override
- public String toString() {
- return "DeclarativeTaskManagerSlot{"
- + "slotId="
- + slotId
- + ", resourceProfile="
- + resourceProfile
- + ", taskManagerConnection="
- + taskManagerConnection
- + ", jobId="
- + jobId
- + ", state="
- + state
- + ", allocationStartTimeStamp="
- + allocationStartTimeStamp
- + '}';
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
deleted file mode 100644
index 38403fdd661..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.instance.InstanceID;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-
-/** Default SlotTracker implementation. */
-public class DefaultSlotTracker implements SlotTracker {
- private static final Logger LOG =
LoggerFactory.getLogger(DefaultSlotTracker.class);
-
- /** Map for all registered slots. */
- private final Map<SlotID, DeclarativeTaskManagerSlot> slots = new
HashMap<>();
-
- /** Index of all currently free slots. */
- private final Map<SlotID, DeclarativeTaskManagerSlot> freeSlots = new
LinkedHashMap<>();
-
- private final MultiSlotStatusUpdateListener slotStatusUpdateListeners =
- new MultiSlotStatusUpdateListener();
-
- private final SlotStatusStateReconciler slotStatusStateReconciler =
- new SlotStatusStateReconciler(
- this::transitionSlotToFree,
- this::transitionSlotToPending,
- this::transitionSlotToAllocated);
-
- @Override
- public void registerSlotStatusUpdateListener(
- SlotStatusUpdateListener slotStatusUpdateListener) {
-
this.slotStatusUpdateListeners.registerSlotStatusUpdateListener(slotStatusUpdateListener);
- }
-
- @Override
- public void addSlot(
- SlotID slotId,
- ResourceProfile resourceProfile,
- TaskExecutorConnection taskManagerConnection,
- @Nullable JobID assignedJob) {
- Preconditions.checkNotNull(slotId);
- Preconditions.checkNotNull(resourceProfile);
- Preconditions.checkNotNull(taskManagerConnection);
-
- if (slots.containsKey(slotId)) {
- // remove the old slot first
- LOG.debug(
- "A slot was added with an already tracked slot ID {}.
Removing previous entry.",
- slotId);
- removeSlot(slotId);
- }
-
- DeclarativeTaskManagerSlot slot =
- new DeclarativeTaskManagerSlot(slotId, resourceProfile,
taskManagerConnection);
- slots.put(slotId, slot);
- freeSlots.put(slotId, slot);
- slotStatusStateReconciler.executeStateTransition(slot, assignedJob);
- }
-
- @Override
- public void removeSlots(Iterable<SlotID> slotsToRemove) {
- Preconditions.checkNotNull(slotsToRemove);
-
- for (SlotID slotId : slotsToRemove) {
- removeSlot(slotId);
- }
- }
-
- private void removeSlot(SlotID slotId) {
- DeclarativeTaskManagerSlot slot = slots.remove(slotId);
-
- if (slot != null) {
- if (slot.getState() != SlotState.FREE) {
- transitionSlotToFree(slot);
- }
- freeSlots.remove(slotId);
- } else {
- LOG.debug("There was no slot registered with slot id {}.", slotId);
- }
- }
-
- //
---------------------------------------------------------------------------------------------
- // ResourceManager slot status API - optimistically trigger transitions,
but they may not
- // represent true state on task executors
- //
---------------------------------------------------------------------------------------------
-
- @Override
- public void notifyFree(SlotID slotId) {
- Preconditions.checkNotNull(slotId);
- transitionSlotToFree(slots.get(slotId));
- }
-
- @Override
- public void notifyAllocationStart(SlotID slotId, JobID jobId) {
- Preconditions.checkNotNull(slotId);
- Preconditions.checkNotNull(jobId);
- transitionSlotToPending(slots.get(slotId), jobId);
- }
-
- @Override
- public void notifyAllocationComplete(SlotID slotId, JobID jobId) {
- Preconditions.checkNotNull(slotId);
- Preconditions.checkNotNull(jobId);
- transitionSlotToAllocated(slots.get(slotId), jobId);
- }
-
- //
---------------------------------------------------------------------------------------------
- // TaskExecutor slot status API - acts as source of truth
- //
---------------------------------------------------------------------------------------------
-
- @Override
- public boolean notifySlotStatus(Iterable<SlotStatus> slotStatuses) {
- Preconditions.checkNotNull(slotStatuses);
- boolean anyStatusChanged = false;
- for (SlotStatus slotStatus : slotStatuses) {
- anyStatusChanged |=
- slotStatusStateReconciler.executeStateTransition(
- slots.get(slotStatus.getSlotID()),
slotStatus.getJobID());
- }
- return anyStatusChanged;
- }
-
- //
---------------------------------------------------------------------------------------------
- // Core state transitions
- //
---------------------------------------------------------------------------------------------
-
- private void transitionSlotToFree(DeclarativeTaskManagerSlot slot) {
- Preconditions.checkNotNull(slot);
- Preconditions.checkState(slot.getState() != SlotState.FREE);
-
- // remember the slots current job and state for the notification, as
this information will
- // be cleared from
- // the slot upon freeing
- final JobID jobId = slot.getJobId();
- final SlotState state = slot.getState();
-
- slot.freeSlot();
- freeSlots.put(slot.getSlotId(), slot);
- slotStatusUpdateListeners.notifySlotStatusChange(slot, state,
SlotState.FREE, jobId);
- }
-
- private void transitionSlotToPending(DeclarativeTaskManagerSlot slot,
JobID jobId) {
- Preconditions.checkNotNull(slot);
- Preconditions.checkState(slot.getState() == SlotState.FREE);
-
- slot.startAllocation(jobId);
- freeSlots.remove(slot.getSlotId());
- slotStatusUpdateListeners.notifySlotStatusChange(
- slot, SlotState.FREE, SlotState.PENDING, jobId);
- }
-
- private void transitionSlotToAllocated(DeclarativeTaskManagerSlot slot,
JobID jobId) {
- Preconditions.checkNotNull(slot);
- Preconditions.checkState(
- jobId.equals(slot.getJobId()),
- "Job ID from slot status update (%s) does not match currently
assigned job ID (%s) for slot %s.",
- jobId,
- slot.getJobId(),
- slot.getSlotId());
- Preconditions.checkState(
- slot.getState() == SlotState.PENDING,
- "State of slot %s must be %s, but was %s.",
- slot.getSlotId(),
- SlotState.PENDING,
- slot.getState());
-
- slot.completeAllocation();
- slotStatusUpdateListeners.notifySlotStatusChange(
- slot, SlotState.PENDING, SlotState.ALLOCATED, jobId);
- }
-
- //
---------------------------------------------------------------------------------------------
- // Misc
- //
---------------------------------------------------------------------------------------------
-
- @Override
- public Collection<TaskManagerSlotInformation> getFreeSlots() {
- return Collections.unmodifiableCollection(freeSlots.values());
- }
-
- @Override
- public Collection<TaskExecutorConnection>
getTaskExecutorsWithAllocatedSlotsForJob(
- JobID jobId) {
- final Map<InstanceID, TaskExecutorConnection> taskExecutorConnections
= new HashMap<>();
- for (DeclarativeTaskManagerSlot value : slots.values()) {
- if (jobId.equals(value.getJobId())) {
- taskExecutorConnections.put(
- value.getInstanceId(),
value.getTaskManagerConnection());
- }
- }
- return taskExecutorConnections.values();
- }
-
- @VisibleForTesting
- boolean areMapsEmpty() {
- return slots.isEmpty() && freeSlots.isEmpty();
- }
-
- @VisibleForTesting
- @Nullable
- DeclarativeTaskManagerSlot getSlot(SlotID slotId) {
- return slots.get(slotId);
- }
-
- /**
- * Slot reports from task executor are the source of truth regarding the
state of slots. The
- * reported state may not match what is currently being tracked, and if so
can contain illegal
- * transitions (e.g., from free to allocated). The tracked and reported
states are reconciled by
- * simulating state transitions that lead us from our currently tracked
state to the actual
- * reported state.
- *
- * <p>One exception to the reported state being the source of truth are
slots reported as being
- * free, but tracked as being pending. This mismatch is assumed to be due
to a slot allocation
- * RPC not yet having been process by the task executor. This mismatch is
hence ignored; it will
- * be resolved eventually with the allocation either being completed or
timing out.
- */
- @VisibleForTesting
- static class SlotStatusStateReconciler {
- private final Consumer<DeclarativeTaskManagerSlot> toFreeSlot;
- private final BiConsumer<DeclarativeTaskManagerSlot, JobID>
toPendingSlot;
- private final BiConsumer<DeclarativeTaskManagerSlot, JobID>
toAllocatedSlot;
-
- @VisibleForTesting
- SlotStatusStateReconciler(
- Consumer<DeclarativeTaskManagerSlot> toFreeSlot,
- BiConsumer<DeclarativeTaskManagerSlot, JobID> toPendingSlot,
- BiConsumer<DeclarativeTaskManagerSlot, JobID> toAllocatedSlot)
{
- this.toFreeSlot = toFreeSlot;
- this.toPendingSlot = toPendingSlot;
- this.toAllocatedSlot = toAllocatedSlot;
- }
-
- public boolean executeStateTransition(DeclarativeTaskManagerSlot slot,
JobID jobId) {
- final SlotState reportedSlotState =
- jobId == null ? SlotState.FREE : SlotState.ALLOCATED;
- final SlotState trackedSlotState = slot.getState();
-
- if (reportedSlotState == SlotState.FREE) {
- switch (trackedSlotState) {
- case FREE:
- // matching state
- return false;
- case PENDING:
- // don't do anything because we expect the slot to be
allocated soon
- return false;
- case ALLOCATED:
- toFreeSlot.accept(slot);
- return true;
- }
- } else {
- switch (trackedSlotState) {
- case FREE:
- toPendingSlot.accept(slot, jobId);
- toAllocatedSlot.accept(slot, jobId);
- return true;
- case PENDING:
- if (!jobId.equals(slot.getJobId())) {
- toFreeSlot.accept(slot);
- toPendingSlot.accept(slot, jobId);
- }
- toAllocatedSlot.accept(slot, jobId);
- return true;
- case ALLOCATED:
- if (!jobId.equals(slot.getJobId())) {
- toFreeSlot.accept(slot);
- toPendingSlot.accept(slot, jobId);
- toAllocatedSlot.accept(slot, jobId);
- return true;
- } else {
- // matching state
- return false;
- }
- }
- }
- return false;
- }
- }
-
- private static class MultiSlotStatusUpdateListener implements
SlotStatusUpdateListener {
-
- private final Collection<SlotStatusUpdateListener> listeners = new
ArrayList<>();
-
- public void registerSlotStatusUpdateListener(
- SlotStatusUpdateListener slotStatusUpdateListener) {
- listeners.add(slotStatusUpdateListener);
- }
-
- @Override
- public void notifySlotStatusChange(
- TaskManagerSlotInformation slot,
- SlotState previous,
- SlotState current,
- JobID jobId) {
- LOG.trace(
- "Slot {} transitioned from {} to {} for job {}.",
- slot.getSlotId(),
- previous,
- current,
- jobId);
- listeners.forEach(
- listeners -> listeners.notifySlotStatusChange(slot,
previous, current, jobId));
- }
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotTracker.java
deleted file mode 100644
index 67abe357ea9..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotTracker.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-
-import javax.annotation.Nullable;
-
-import java.util.Collection;
-
-/** Tracks slots and their {@link SlotState}. */
-interface SlotTracker {
-
- /**
- * Registers the given listener with this tracker.
- *
- * @param slotStatusUpdateListener listener to register
- */
- void registerSlotStatusUpdateListener(SlotStatusUpdateListener
slotStatusUpdateListener);
-
- /**
- * Adds the given slot to this tracker. The given slot may already be
allocated for a job. This
- * method must be called before the tracker is notified of any state
transition or slot status
- * notification.
- *
- * @param slotId ID of the slot
- * @param resourceProfile resource of the slot
- * @param taskManagerConnection connection to the hosting task executor
- * @param initialJob job that the slot is allocated for, or null if it is
free
- */
- void addSlot(
- SlotID slotId,
- ResourceProfile resourceProfile,
- TaskExecutorConnection taskManagerConnection,
- @Nullable JobID initialJob);
-
- /**
- * Removes the given set of slots from the slot manager. If a removed slot
was not free at the
- * time of removal, then this method will automatically transition the
slot to a free state.
- *
- * @param slotsToRemove identifying the slots to remove from the slot
manager
- */
- void removeSlots(Iterable<SlotID> slotsToRemove);
-
- /**
- * Notifies the tracker that the allocation for the given slot, for the
given job, has started.
- *
- * @param slotId slot being allocated
- * @param jobId job for which the slot is being allocated
- */
- void notifyAllocationStart(SlotID slotId, JobID jobId);
-
- /**
- * Notifies the tracker that the allocation for the given slot, for the
given job, has completed
- * successfully.
- *
- * @param slotId slot being allocated
- * @param jobId job for which the slot is being allocated
- */
- void notifyAllocationComplete(SlotID slotId, JobID jobId);
-
- /**
- * Notifies the tracker that the given slot was freed.
- *
- * @param slotId slot being freed
- */
- void notifyFree(SlotID slotId);
-
- /**
- * Notifies the tracker about the slot statuses.
- *
- * @param slotStatuses slot statues
- * @return whether any slot status has changed
- */
- boolean notifySlotStatus(Iterable<SlotStatus> slotStatuses);
-
- /**
- * Returns a view over free slots. The returned collection cannot be
modified directly, but
- * reflects changes to the set of free slots.
- *
- * @return free slots
- */
- Collection<TaskManagerSlotInformation> getFreeSlots();
-
- /**
- * Returns all task executors that have at least 1 pending/completed
allocation for the given
- * job.
- *
- * @param jobId the job for which the task executors must have a slot
- * @return task executors with at least 1 slot for the job
- */
- Collection<TaskExecutorConnection>
getTaskExecutorsWithAllocatedSlotsForJob(JobID jobId);
-}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
deleted file mode 100644
index c72adf4e87a..00000000000
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-
-import org.junit.jupiter.api.Test;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.Queue;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Tests for the {@link DefaultSlotTracker}. */
-class DefaultSlotTrackerTest {
-
- private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
- new TaskExecutorConnection(
- ResourceID.generate(),
- new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
-
- private static final JobID jobId = new JobID();
-
- @Test
- public void testFreeSlotsIsEmptyOnInitially() {
- SlotTracker tracker = new DefaultSlotTracker();
-
- assertThat(tracker.getFreeSlots()).isEmpty();
- }
-
- @Test
- public void testSlotAddition() {
- SlotTracker tracker = new DefaultSlotTracker();
-
- SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
0);
- SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
1);
-
- tracker.addSlot(slotId1, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
- tracker.addSlot(slotId2, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
-
-
assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
- .containsExactlyInAnyOrder(slotId1, slotId2);
- }
-
- @Test
- public void testSlotRemoval() {
- Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
- DefaultSlotTracker tracker = new DefaultSlotTracker();
- tracker.registerSlotStatusUpdateListener(
- (slot, previous, current, jobId) ->
- stateTransitions.add(
- new SlotStateTransition(
- slot.getSlotId(), previous, current,
jobId)));
-
- SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
0);
- SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
1);
- SlotID slotId3 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
2);
-
- tracker.addSlot(slotId1, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
- tracker.addSlot(slotId2, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
- tracker.addSlot(slotId3, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
-
- tracker.notifyAllocationStart(slotId2, jobId);
- tracker.notifyAllocationStart(slotId3, jobId);
- tracker.notifyAllocationComplete(slotId3, jobId);
-
- // the transitions to this point are not relevant for this test
- stateTransitions.clear();
- // we now have 1 slot in each slot state (free, pending, allocated)
- // it should be possible to remove slots regardless of their state
- tracker.removeSlots(Arrays.asList(slotId1, slotId2, slotId3));
-
- assertThat(tracker.getFreeSlots()).isEmpty();
- assertThat(tracker.areMapsEmpty()).isTrue();
-
- assertThat(stateTransitions)
- .containsExactlyInAnyOrder(
- new SlotStateTransition(slotId2, SlotState.PENDING,
SlotState.FREE, jobId),
- new SlotStateTransition(
- slotId3, SlotState.ALLOCATED, SlotState.FREE,
jobId));
- }
-
- @Test
- public void testAllocationCompletion() {
- Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
- SlotTracker tracker = new DefaultSlotTracker();
- tracker.registerSlotStatusUpdateListener(
- (slot, previous, current, jobId) ->
- stateTransitions.add(
- new SlotStateTransition(
- slot.getSlotId(), previous, current,
jobId)));
-
- SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
0);
-
- tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION,
null);
-
- tracker.notifyAllocationStart(slotId, jobId);
- assertThat(tracker.getFreeSlots()).isEmpty();
- assertThat(stateTransitions.remove())
- .isEqualTo(
- new SlotStateTransition(slotId, SlotState.FREE,
SlotState.PENDING, jobId));
-
- tracker.notifyAllocationComplete(slotId, jobId);
- assertThat(tracker.getFreeSlots()).isEmpty();
- assertThat(stateTransitions.remove())
- .isEqualTo(
- new SlotStateTransition(
- slotId, SlotState.PENDING,
SlotState.ALLOCATED, jobId));
-
- tracker.notifyFree(slotId);
-
-
assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
- .contains(slotId);
- assertThat(stateTransitions.remove())
- .isEqualTo(
- new SlotStateTransition(
- slotId, SlotState.ALLOCATED, SlotState.FREE,
jobId));
- }
-
- @Test
- public void
testAllocationCompletionForDifferentJobThrowsIllegalStateException() {
- SlotTracker tracker = new DefaultSlotTracker();
-
- SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
0);
-
- tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION,
null);
-
- tracker.notifyAllocationStart(slotId, new JobID());
- assertThatThrownBy(() -> tracker.notifyAllocationComplete(slotId, new
JobID()))
- .withFailMessage("Allocations must not be completed for a
different job ID.")
- .isInstanceOf(IllegalStateException.class);
- }
-
- @Test
- public void testAllocationCancellation() {
- Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
- SlotTracker tracker = new DefaultSlotTracker();
- tracker.registerSlotStatusUpdateListener(
- (slot, previous, current, jobId) ->
- stateTransitions.add(
- new SlotStateTransition(
- slot.getSlotId(), previous, current,
jobId)));
-
- SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
0);
-
- tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION,
null);
-
- tracker.notifyAllocationStart(slotId, jobId);
- assertThat(tracker.getFreeSlots()).isEmpty();
- assertThat(stateTransitions.remove())
- .isEqualTo(
- new SlotStateTransition(slotId, SlotState.FREE,
SlotState.PENDING, jobId));
-
- tracker.notifyFree(slotId);
-
assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
- .contains(slotId);
- assertThat(stateTransitions.remove())
- .isEqualTo(
- new SlotStateTransition(slotId, SlotState.PENDING,
SlotState.FREE, jobId));
- }
-
- /**
- * Tests that notifications are fired before the internal state transition
has been executed, to
- * ensure that components reacting to the status update are in a
consistent state with the
- * tracker. Note that this test is not conclusive for transitions from
PENDING to ALLOCATED, but
- * that's okay for now because this distinction isn't exposed anywhere in
the API.
- */
- @Test
- public void testNotificationsFiredAfterStateTransition() {
- SlotID slotId = new SlotID(ResourceID.generate(), 0);
-
- DefaultSlotTracker tracker = new DefaultSlotTracker();
- tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION,
null);
-
- tracker.registerSlotStatusUpdateListener(
- (slot, previous, current, jobId) -> {
- if (current == SlotState.FREE) {
- assertThat(
- tracker.getFreeSlots().stream()
-
.map(TaskManagerSlotInformation::getSlotId))
- .contains(slotId);
- } else {
- assertThat(
- tracker.getFreeSlots().stream()
-
.map(TaskManagerSlotInformation::getSlotId))
- .doesNotContain(slotId);
- }
- });
-
- tracker.notifyAllocationStart(slotId, jobId);
- tracker.notifyAllocationComplete(slotId, jobId);
- tracker.notifyFree(slotId);
- }
-
- @Test
- public void testSlotStatusProcessing() {
- SlotTracker tracker = new DefaultSlotTracker();
- SlotID slotId1 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
0);
- SlotID slotId2 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
1);
- SlotID slotId3 = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(),
2);
- tracker.addSlot(slotId1, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
- tracker.addSlot(slotId2, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
- tracker.addSlot(slotId3, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, jobId);
-
-
assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
- .containsExactlyInAnyOrder(slotId1, slotId2);
-
- // move slot2 to PENDING
- tracker.notifyAllocationStart(slotId2, jobId);
-
- final List<SlotStatus> slotReport =
- Arrays.asList(
- new SlotStatus(slotId1, ResourceProfile.ANY, jobId,
new AllocationID()),
- new SlotStatus(slotId2, ResourceProfile.ANY, null, new
AllocationID()),
- new SlotStatus(slotId3, ResourceProfile.ANY, null, new
AllocationID()));
-
- assertThat(tracker.notifySlotStatus(slotReport)).isTrue();
-
- // slot1 should now be allocated; slot2 should continue to be in a
pending state; slot3
- // should be freed
-
assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
- .contains(slotId3);
-
- // if slot2 is not in a pending state, this will fail with an exception
- tracker.notifyAllocationComplete(slotId2, jobId);
-
- final List<SlotStatus> idempotentSlotReport =
- Arrays.asList(
- new SlotStatus(slotId1, ResourceProfile.ANY, jobId,
new AllocationID()),
- new SlotStatus(slotId2, ResourceProfile.ANY, jobId,
new AllocationID()),
- new SlotStatus(slotId3, ResourceProfile.ANY, null, new
AllocationID()));
-
- assertThat(tracker.notifySlotStatus(idempotentSlotReport)).isFalse();
- }
-
- @Test
- public void testGetTaskExecutorsWithAllocatedSlotsForJob() {
- final SlotTracker tracker = new DefaultSlotTracker();
-
- final JobID jobId = new JobID();
- final SlotID slotId = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
-
- assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new
JobID())).isEmpty();
-
- tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION,
null);
- assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new
JobID())).isEmpty();
-
- tracker.notifyAllocationStart(slotId, jobId);
- assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId))
- .contains(TASK_EXECUTOR_CONNECTION);
-
- tracker.notifyAllocationComplete(slotId, jobId);
- assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId))
- .contains(TASK_EXECUTOR_CONNECTION);
-
- tracker.notifyFree(slotId);
- assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new
JobID())).isEmpty();
- }
-
- private static class SlotStateTransition {
-
- private final SlotID slotId;
- private final SlotState oldState;
- private final SlotState newState;
- @Nullable private final JobID jobId;
-
- private SlotStateTransition(
- SlotID slotId, SlotState oldState, SlotState newState,
@Nullable JobID jobId) {
- this.slotId = slotId;
- this.jobId = jobId;
- this.oldState = oldState;
- this.newState = newState;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SlotStateTransition that = (SlotStateTransition) o;
- return Objects.equals(slotId, that.slotId)
- && oldState == that.oldState
- && newState == that.newState
- && Objects.equals(jobId, that.jobId);
- }
-
- @Override
- public String toString() {
- return "SlotStateTransition{"
- + "slotId="
- + slotId
- + ", oldState="
- + oldState
- + ", newState="
- + newState
- + ", jobId="
- + jobId
- + '}';
- }
- }
-}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
deleted file mode 100644
index e697cb1e348..00000000000
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-
-import org.assertj.core.api.Condition;
-import org.junit.jupiter.api.Test;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import static
org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusReconcilerTest.SlotStateTransitionMatcher.ofMatcher;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * Tests for the {@link DefaultSlotTracker.SlotStatusStateReconciler}. Tests
all state transitions
- * that could (or should not) occur due to a slot status update. This test
only checks the target
- * state and job ID for state transitions, because the slot ID is not
interesting and the slot state
- * is not *actually* being updated. We assume the reconciler locks in a set of
transitions given a
- * source and target state, without worrying about the correctness of
intermediate steps (because it
- * shouldn't; and it would be a bit annoying to setup).
- */
-class SlotStatusReconcilerTest {
-
- private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
- new TaskExecutorConnection(
- ResourceID.generate(),
- new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
-
- @Test
- void testSlotStatusReconciliationForFreeSlot() {
- JobID jobId1 = new JobID();
- StateTransitionTracker stateTransitionTracker = new
StateTransitionTracker();
-
- DefaultSlotTracker.SlotStatusStateReconciler reconciler =
- createSlotStatusReconciler(stateTransitionTracker);
-
- DeclarativeTaskManagerSlot slot =
- new DeclarativeTaskManagerSlot(
- new SlotID(ResourceID.generate(), 0),
- ResourceProfile.ANY,
- TASK_EXECUTOR_CONNECTION);
-
- // free -> free
- assertThat(reconciler.executeStateTransition(slot, null)).isFalse();
- assertThat(stateTransitionTracker.stateTransitions).isEmpty();
-
- // free -> allocated
- assertThat(reconciler.executeStateTransition(slot, jobId1)).isTrue();
- assertThat(stateTransitionTracker.stateTransitions.remove())
- .satisfies(ofMatcher(SlotState.PENDING, jobId1));
- assertThat(stateTransitionTracker.stateTransitions.remove())
- .satisfies(ofMatcher(SlotState.ALLOCATED, jobId1));
- }
-
- @Test
- void testSlotStatusReconciliationForPendingSlot() {
- JobID jobId1 = new JobID();
- StateTransitionTracker stateTransitionTracker = new
StateTransitionTracker();
-
- DefaultSlotTracker.SlotStatusStateReconciler reconciler =
- createSlotStatusReconciler(stateTransitionTracker);
-
- DeclarativeTaskManagerSlot slot =
- new DeclarativeTaskManagerSlot(
- new SlotID(ResourceID.generate(), 0),
- ResourceProfile.ANY,
- TASK_EXECUTOR_CONNECTION);
- slot.startAllocation(jobId1);
-
- // pending vs. free; should not trigger any transition because we are
expecting a slot
- // allocation in the future
- assertThat(reconciler.executeStateTransition(slot, null)).isFalse();
- assertThat(stateTransitionTracker.stateTransitions).isEmpty();
-
- // pending -> allocated
- assertThat(reconciler.executeStateTransition(slot, jobId1)).isTrue();
- assertThat(stateTransitionTracker.stateTransitions.remove())
- .satisfies(ofMatcher(SlotState.ALLOCATED, jobId1));
- }
-
- @Test
- void testSlotStatusReconciliationForPendingSlotWithDifferentJobID() {
- JobID jobId1 = new JobID();
- JobID jobId2 = new JobID();
- StateTransitionTracker stateTransitionTracker = new
StateTransitionTracker();
-
- DefaultSlotTracker.SlotStatusStateReconciler reconciler =
- createSlotStatusReconciler(stateTransitionTracker);
-
- DeclarativeTaskManagerSlot slot =
- new DeclarativeTaskManagerSlot(
- new SlotID(ResourceID.generate(), 0),
- ResourceProfile.ANY,
- TASK_EXECUTOR_CONNECTION);
- slot.startAllocation(jobId1);
-
- // pending(job1) -> free -> pending(job2) -> allocated(job2)
- assertThat(reconciler.executeStateTransition(slot, jobId2)).isTrue();
- assertThat(stateTransitionTracker.stateTransitions.remove())
- .satisfies(ofMatcher(SlotState.FREE, jobId1));
- assertThat(stateTransitionTracker.stateTransitions.remove())
- .satisfies(ofMatcher(SlotState.PENDING, jobId2));
- assertThat(stateTransitionTracker.stateTransitions.remove())
- .satisfies(ofMatcher(SlotState.ALLOCATED, jobId2));
- }
-
- @Test
- void testSlotStatusReconciliationForAllocatedSlot() {
- JobID jobId1 = new JobID();
- StateTransitionTracker stateTransitionTracker = new
StateTransitionTracker();
-
- DefaultSlotTracker.SlotStatusStateReconciler reconciler =
- createSlotStatusReconciler(stateTransitionTracker);
-
- DeclarativeTaskManagerSlot slot =
- new DeclarativeTaskManagerSlot(
- new SlotID(ResourceID.generate(), 0),
- ResourceProfile.ANY,
- TASK_EXECUTOR_CONNECTION);
- slot.startAllocation(jobId1);
- slot.completeAllocation();
-
- // allocated -> allocated
- assertThat(reconciler.executeStateTransition(slot, jobId1)).isFalse();
- assertThat(stateTransitionTracker.stateTransitions).isEmpty();
-
- // allocated -> free
- assertThat(reconciler.executeStateTransition(slot, null)).isTrue();
- assertThat(stateTransitionTracker.stateTransitions.remove())
- .satisfies(ofMatcher(SlotState.FREE, jobId1));
- }
-
- @Test
- void testSlotStatusReconciliationForAllocatedSlotWithDifferentJobID() {
- JobID jobId1 = new JobID();
- JobID jobId2 = new JobID();
- StateTransitionTracker stateTransitionTracker = new
StateTransitionTracker();
-
- DefaultSlotTracker.SlotStatusStateReconciler reconciler =
- createSlotStatusReconciler(stateTransitionTracker);
-
- DeclarativeTaskManagerSlot slot =
- new DeclarativeTaskManagerSlot(
- new SlotID(ResourceID.generate(), 0),
- ResourceProfile.ANY,
- TASK_EXECUTOR_CONNECTION);
- slot.startAllocation(jobId1);
- slot.completeAllocation();
-
- // allocated(job1) -> free -> pending(job2) -> allocated(job2)
- assertThat(reconciler.executeStateTransition(slot, jobId2)).isTrue();
- assertThat(stateTransitionTracker.stateTransitions.remove())
- .satisfies(ofMatcher(SlotState.FREE, jobId1));
- assertThat(stateTransitionTracker.stateTransitions.remove())
- .satisfies(ofMatcher(SlotState.PENDING, jobId2));
- assertThat(stateTransitionTracker.stateTransitions.remove())
- .satisfies(ofMatcher(SlotState.ALLOCATED, jobId2));
- }
-
- private static class StateTransitionTracker {
- Queue<SlotStateTransition> stateTransitions = new ArrayDeque<>();
-
- void notifyFree(DeclarativeTaskManagerSlot slot) {
- stateTransitions.add(new SlotStateTransition(SlotState.FREE,
slot.getJobId()));
- }
-
- void notifyPending(JobID jobId) {
- stateTransitions.add(new SlotStateTransition(SlotState.PENDING,
jobId));
- }
-
- void notifyAllocated(JobID jobId) {
- stateTransitions.add(new SlotStateTransition(SlotState.ALLOCATED,
jobId));
- }
- }
-
- private static DefaultSlotTracker.SlotStatusStateReconciler
createSlotStatusReconciler(
- StateTransitionTracker stateTransitionTracker) {
- return new DefaultSlotTracker.SlotStatusStateReconciler(
- stateTransitionTracker::notifyFree,
- (jobId, jobId2) ->
stateTransitionTracker.notifyPending(jobId2),
- (jobId1, jobId12) ->
stateTransitionTracker.notifyAllocated(jobId12));
- }
-
- static class SlotStateTransition {
-
- private final SlotState newState;
- @Nullable private final JobID jobId;
-
- private SlotStateTransition(SlotState newState, @Nullable JobID jobId)
{
- this.jobId = jobId;
- this.newState = newState;
- }
-
- @Override
- public String toString() {
- return "SlotStateTransition{" + ", newState=" + newState + ",
jobId=" + jobId + '}';
- }
- }
-
- static class SlotStateTransitionMatcher extends
Condition<SlotStateTransition> {
-
- private final SlotState targetState;
- private final JobID jobId;
-
- private SlotStateTransitionMatcher(SlotState targetState, JobID jobId)
{
- this.targetState = targetState;
- this.jobId = jobId;
- }
-
- @Override
- public boolean matches(SlotStateTransition value) {
- return value.newState == targetState && jobId.equals(value.jobId);
- }
-
- static SlotStateTransitionMatcher ofMatcher(SlotState targetState,
JobID jobId) {
- return new SlotStateTransitionMatcher(targetState, jobId);
- }
- }
-}