tillrohrmann commented on a change in pull request #13508:
URL: https://github.com/apache/flink/pull/13508#discussion_r499586350
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -343,6 +344,7 @@ public void testRunningJobsRegistryCleanup() throws
Exception {
* before a new job with the same {@link JobID} is started.
*/
@Test
+ @Ignore
Review comment:
Do we know why this test is untable?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Default SlotTracker implementation.
+ */
+class DefaultSlotTracker implements SlotTracker {
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultSlotTracker.class);
+
+ /**
+ * Map for all registered slots.
+ */
+ private final Map<SlotID, DeclarativeTaskManagerSlot> slots = new
HashMap<>();
+
+ /**
+ * Index of all currently free slots.
+ */
+ private final Map<SlotID, DeclarativeTaskManagerSlot> freeSlots = new
LinkedHashMap<>();
+
+ private final SlotStatusUpdateListener slotStatusUpdateListener;
+
+ private final SlotStatusStateReconciler slotStatusStateReconciler = new
SlotStatusStateReconciler(this::transitionSlotToFree,
this::transitionSlotToPending, this::transitionSlotToAllocated);
+
+ public DefaultSlotTracker(SlotStatusUpdateListener
slotStatusUpdateListener) {
Review comment:
Is it because now we need to introduce a factory to introduce a custom
tracker into the `SlotManager`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Default SlotTracker implementation.
+ */
+class DefaultSlotTracker implements SlotTracker {
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultSlotTracker.class);
+
+ /**
+ * Map for all registered slots.
+ */
+ private final Map<SlotID, DeclarativeTaskManagerSlot> slots = new
HashMap<>();
+
+ /**
+ * Index of all currently free slots.
+ */
+ private final Map<SlotID, DeclarativeTaskManagerSlot> freeSlots = new
LinkedHashMap<>();
+
+ private final SlotStatusUpdateListener slotStatusUpdateListener;
+
+ private final SlotStatusStateReconciler slotStatusStateReconciler = new
SlotStatusStateReconciler(this::transitionSlotToFree,
this::transitionSlotToPending, this::transitionSlotToAllocated);
+
+ public DefaultSlotTracker(SlotStatusUpdateListener
slotStatusUpdateListener) {
+ this.slotStatusUpdateListener =
Preconditions.checkNotNull(slotStatusUpdateListener);
+ }
+
+ @Override
+ public void addSlot(
+ SlotID slotId,
+ ResourceProfile resourceProfile,
+ TaskExecutorConnection taskManagerConnection,
+ @Nullable JobID assignedJob) {
+ Preconditions.checkNotNull(slotId);
+ Preconditions.checkNotNull(resourceProfile);
+ Preconditions.checkNotNull(taskManagerConnection);
+
+ if (slots.containsKey(slotId)) {
+ // remove the old slot first
+ LOG.debug("A slot was added with an already tracked
slot ID {}. Removing previous entry.", slotId);
+ removeSlot(slotId);
+ }
+
+ DeclarativeTaskManagerSlot slot = new
DeclarativeTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
+ slots.put(slotId, slot);
+ freeSlots.put(slotId, slot);
+ slotStatusStateReconciler.executeStateTransition(slot,
assignedJob);
+ }
+
+ @Override
+ public void removeSlots(Iterable<SlotID> slotsToRemove) {
+ Preconditions.checkNotNull(slotsToRemove);
+
+ for (SlotID slotId : slotsToRemove) {
+ removeSlot(slotId);
+ }
+ }
+
+ private void removeSlot(SlotID slotId) {
+ DeclarativeTaskManagerSlot slot = slots.remove(slotId);
+
+ if (slot != null) {
+ if (slot.getState() != SlotState.FREE) {
+ transitionSlotToFree(slot);
+ }
+ freeSlots.remove(slotId);
+ } else {
+ LOG.debug("There was no slot registered with slot id
{}.", slotId);
+ }
+ }
+
+ //
---------------------------------------------------------------------------------------------
+ // ResourceManager slot status API - optimistically trigger
transitions, but they may not represent true state on task executors
+ //
---------------------------------------------------------------------------------------------
+
+ @Override
+ public void notifyFree(SlotID slotId) {
+ Preconditions.checkNotNull(slotId);
+ transitionSlotToFree(slots.get(slotId));
+ }
+
+ @Override
+ public void notifyAllocationStart(SlotID slotId, JobID jobId) {
+ Preconditions.checkNotNull(slotId);
+ Preconditions.checkNotNull(jobId);
+ transitionSlotToPending(slots.get(slotId), jobId);
+ }
+
+ @Override
+ public void notifyAllocationComplete(SlotID slotId, JobID jobId) {
+ Preconditions.checkNotNull(slotId);
+ Preconditions.checkNotNull(jobId);
+ transitionSlotToAllocated(slots.get(slotId), jobId);
+ }
+
+ //
---------------------------------------------------------------------------------------------
+ // TaskExecutor slot status API - acts as source of truth
+ //
---------------------------------------------------------------------------------------------
+
+ @Override
+ public void notifySlotStatus(Iterable<SlotStatus> slotStatuses) {
+ Preconditions.checkNotNull(slotStatuses);
+ for (SlotStatus slotStatus : slotStatuses) {
+
slotStatusStateReconciler.executeStateTransition(slots.get(slotStatus.getSlotID()),
slotStatus.getJobID());
+ }
+ }
+
+ //
---------------------------------------------------------------------------------------------
+ // Core state transitions
+ //
---------------------------------------------------------------------------------------------
+
+ private void transitionSlotToFree(DeclarativeTaskManagerSlot slot) {
+ Preconditions.checkNotNull(slot);
+ Preconditions.checkState(slot.getState() != SlotState.FREE);
+
+ // remember the slots current job and state for the
notification, as this information will be cleared from
+ // the slot upon freeing
+ final JobID jobId = slot.getJobId();
+ final SlotState state = slot.getState();
+
+ slot.freeSlot();
+ freeSlots.put(slot.getSlotId(), slot);
+ slotStatusUpdateListener.notifySlotStatusChange(slot, state,
SlotState.FREE, jobId);
+ }
+
+ private void transitionSlotToPending(DeclarativeTaskManagerSlot slot,
JobID jobId) {
+ Preconditions.checkNotNull(slot);
+ Preconditions.checkState(slot.getState() == SlotState.FREE);
+
+ slot.startAllocation(jobId);
+ freeSlots.remove(slot.getSlotId());
+ slotStatusUpdateListener.notifySlotStatusChange(slot,
SlotState.FREE, SlotState.PENDING, jobId);
+ }
+
+ private void transitionSlotToAllocated(DeclarativeTaskManagerSlot slot,
JobID jobId) {
+ Preconditions.checkNotNull(slot);
+ Preconditions.checkState(slot.getState() == SlotState.PENDING);
+ Preconditions.checkState(jobId.equals(slot.getJobId()));
+
+ slotStatusUpdateListener.notifySlotStatusChange(slot,
SlotState.PENDING, SlotState.ALLOCATED, jobId);
+ slot.completeAllocation();
Review comment:
Does it make a difference whether
`slotStatusUpdateListener.notifySlotStatusChange` is called before effectuating
the state change on `slot`? I am asking because `transitionSlotToPending` and
`transitionSlotToFree` do it in the different order.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Queue;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultSlotTracker}.
+ */
+public class DefaultSlotTrackerTest extends TestLogger {
+
+ private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
new TaskExecutorConnection(
+ ResourceID.generate(),
+ new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
+
+ private static final JobID jobId = new JobID();
+
+ @Test
+ public void testInitialBehavior() {
+ SlotTracker tracker = new DefaultSlotTracker((slot, previous,
current, jobId) -> {});
+
+ assertThat(tracker.getFreeSlots(), empty());
+ }
+
+ @Test
+ public void testSlotAddition() {
+ SlotTracker tracker = new DefaultSlotTracker((slot, previous,
current, jobId) -> {});
+
+ SlotID slotId1 = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+ SlotID slotId2 = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1);
+
+ tracker.addSlot(slotId1, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
+ tracker.addSlot(slotId2, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
+
+ assertThat(tracker.getFreeSlots(),
containsInAnyOrder(Arrays.asList(infoWithSlotId(slotId1),
infoWithSlotId(slotId2))));
+ }
+
+ @Test
+ public void testSlotRemoval() {
+ Queue<SlotStateTransition> stateTransitions = new
ArrayDeque<>();
+ DefaultSlotTracker tracker = new DefaultSlotTracker((slot,
previous, current, jobId) ->
+ stateTransitions.add(new
SlotStateTransition(slot.getSlotId(), previous, current, jobId)));
+
+ SlotID slotId1 = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+ SlotID slotId2 = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1);
+ SlotID slotId3 = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 2);
+
+ tracker.addSlot(slotId1, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
+ tracker.addSlot(slotId2, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
+ tracker.addSlot(slotId3, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
+
+ tracker.notifyAllocationStart(slotId2, jobId);
+ tracker.notifyAllocationStart(slotId3, jobId);
+ tracker.notifyAllocationComplete(slotId3, jobId);
+
+ // the transitions to this point are not relevant for this test
+ stateTransitions.clear();
+ // we now have 1 slot in each slot state (free, pending,
allocated)
+ // it should be possible to remove slots regardless of their
state
+ tracker.removeSlots(Arrays.asList(slotId1, slotId2, slotId3));
+
+ assertThat(tracker.getFreeSlots(), empty());
+ assertThat(tracker.areMapsEmpty(), is(true));
+
+ assertThat(stateTransitions, containsInAnyOrder(
+ new SlotStateTransition(slotId2, SlotState.PENDING,
SlotState.FREE, jobId),
+ new SlotStateTransition(slotId3, SlotState.ALLOCATED,
SlotState.FREE, jobId)
+ ));
+ }
+
+ @Test
+ public void testAllocationCompletion() {
+ Queue<SlotStateTransition> stateTransitions = new
ArrayDeque<>();
+ SlotTracker tracker = new DefaultSlotTracker((slot, previous,
current, jobId) ->
+ stateTransitions.add(new
SlotStateTransition(slot.getSlotId(), previous, current, jobId)));
+
+ SlotID slotId = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+
+ tracker.addSlot(slotId, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
+
+ tracker.notifyAllocationStart(slotId, jobId);
+ assertThat(tracker.getFreeSlots(), empty());
+ assertThat(stateTransitions.remove(), is(new
SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)));
+
+ tracker.notifyAllocationComplete(slotId, jobId);
+ assertThat(tracker.getFreeSlots(), empty());
+ assertThat(stateTransitions.remove(), is(new
SlotStateTransition(slotId, SlotState.PENDING, SlotState.ALLOCATED, jobId)));
+
+ tracker.notifyFree(slotId);
+
+ assertThat(tracker.getFreeSlots(),
contains(infoWithSlotId(slotId)));
+ assertThat(stateTransitions.remove(), is(new
SlotStateTransition(slotId, SlotState.ALLOCATED, SlotState.FREE, jobId)));
+ }
+
+ @Test
+ public void
testAllocationCompletionForDifferentJobThrowsIllegalStateException() {
+ SlotTracker tracker = new DefaultSlotTracker((slot, previous,
current, jobId) -> {});
+
+ SlotID slotId = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+
+ tracker.addSlot(slotId, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
+
+ tracker.notifyAllocationStart(slotId, new JobID());
+ try {
+ tracker.notifyAllocationComplete(slotId, new JobID());
+ fail("Allocations must not be completed for a different
job ID.");
+ } catch (IllegalStateException expected) {
+ }
+ }
+
+ @Test
+ public void testAllocationCancellation() {
+ Queue<SlotStateTransition> stateTransitions = new
ArrayDeque<>();
+ SlotTracker tracker = new DefaultSlotTracker((slot, previous,
current, jobId) ->
+ stateTransitions.add(new
SlotStateTransition(slot.getSlotId(), previous, current, jobId)));
+
+ SlotID slotId = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+
+ tracker.addSlot(slotId, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
+
+ tracker.notifyAllocationStart(slotId, jobId);
+ assertThat(tracker.getFreeSlots(), empty());
+ assertThat(stateTransitions.remove(), is(new
SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)));
+
+ tracker.notifyFree(slotId);
+ assertThat(tracker.getFreeSlots(),
contains(infoWithSlotId(slotId)));
+ assertThat(stateTransitions.remove(), is(new
SlotStateTransition(slotId, SlotState.PENDING, SlotState.FREE, jobId)));
+ }
+
+ @Test
+ public void testSlotStatusProcessing() {
+ SlotTracker tracker = new DefaultSlotTracker((slot, previous,
current, jobId) -> {});
+ SlotID slotId1 = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+ SlotID slotId2 = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1);
+ SlotID slotId3 = new
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 2);
+ tracker.addSlot(slotId1, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
+ tracker.addSlot(slotId2, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, null);
+ tracker.addSlot(slotId3, ResourceProfile.ANY,
TASK_EXECUTOR_CONNECTION, jobId);
+
+ assertThat(tracker.getFreeSlots(),
containsInAnyOrder(Arrays.asList(infoWithSlotId(slotId1),
infoWithSlotId(slotId2))));
+
+ // move slot2 to PENDING
+ tracker.notifyAllocationStart(slotId2, jobId);
+
+ tracker.notifySlotStatus(Arrays.asList(
+ new SlotStatus(slotId1, ResourceProfile.ANY, jobId, new
AllocationID()),
+ new SlotStatus(slotId2, ResourceProfile.ANY, null, new
AllocationID()),
+ new SlotStatus(slotId3, ResourceProfile.ANY, null, new
AllocationID())));
+
+ // slot1 should now be allocated; slot2 should continue to be
in a pending state; slot3 should be freed
+ assertThat(tracker.getFreeSlots(),
contains(infoWithSlotId(slotId3)));
+
+ // if slot2 is not in a pending state, this will fail with an
exception
+ tracker.notifyAllocationComplete(slotId2, jobId);
+ }
+
+ /**
+ * Tests all state transitions that could (or should not) occur due to
a slot status update. This test only checks
+ * the target state and job ID for state transitions, because the slot
ID is not interesting and the slot state
+ * is not *actually* being updated. We assume the reconciler locks in a
set of transitions given a source and target
+ * state, without worrying about the correctness of intermediate steps
(because it shouldn't; and it would be a bit
+ * annoying to setup).
+ */
+ @Test
+ public void testSlotStatusReconciliation() {
+ JobID jobId1 = new JobID();
+ JobID jobId2 = new JobID();
+
+ Queue<SlotStateTransition> stateTransitions = new
ArrayDeque<>();
+
+ DefaultSlotTracker.SlotStatusStateReconciler reconciler = new
DefaultSlotTracker.SlotStatusStateReconciler(
+ slot -> stateTransitions.add(new
SlotStateTransition(slot.getSlotId(), slot.getState(), SlotState.FREE,
slot.getJobId())),
+ (slot, jobID) -> stateTransitions.add(new
SlotStateTransition(slot.getSlotId(), slot.getState(), SlotState.PENDING,
jobID)),
+ (slot, jobID) -> stateTransitions.add(new
SlotStateTransition(slot.getSlotId(), slot.getState(), SlotState.ALLOCATED,
jobID)));
+
+ {// free slot
+ DeclarativeTaskManagerSlot slot = new
DeclarativeTaskManagerSlot(new SlotID(ResourceID.generate(), 0),
ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION);
+
+ reconciler.executeStateTransition(slot, null);
+ assertThat(stateTransitions, empty());
+
+ reconciler.executeStateTransition(slot, jobId1);
+ assertThat(stateTransitions.remove(),
is(transitionWithTargetStateForJob(SlotState.PENDING, jobId1)));
+ assertThat(stateTransitions.remove(),
is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId1)));
+ }
Review comment:
I think I would split these blocks into separate test methods.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Queue;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultSlotTracker}.
+ */
+public class DefaultSlotTrackerTest extends TestLogger {
+
+ private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
new TaskExecutorConnection(
+ ResourceID.generate(),
+ new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
+
+ private static final JobID jobId = new JobID();
+
+ @Test
+ public void testInitialBehavior() {
Review comment:
maybe spell out what the initial behaviour is. For someone not having
written this code, this might not be obvious.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]