tillrohrmann commented on a change in pull request #13508:
URL: https://github.com/apache/flink/pull/13508#discussion_r499586350



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -343,6 +344,7 @@ public void testRunningJobsRegistryCleanup() throws 
Exception {
         * before a new job with the same {@link JobID} is started.
         */
        @Test
+       @Ignore

Review comment:
       Do we know why this test is untable?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Default SlotTracker implementation.
+ */
+class DefaultSlotTracker implements SlotTracker {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSlotTracker.class);
+
+       /**
+        * Map for all registered slots.
+        */
+       private final Map<SlotID, DeclarativeTaskManagerSlot> slots = new 
HashMap<>();
+
+       /**
+        * Index of all currently free slots.
+        */
+       private final Map<SlotID, DeclarativeTaskManagerSlot> freeSlots = new 
LinkedHashMap<>();
+
+       private final SlotStatusUpdateListener slotStatusUpdateListener;
+
+       private final SlotStatusStateReconciler slotStatusStateReconciler = new 
SlotStatusStateReconciler(this::transitionSlotToFree, 
this::transitionSlotToPending, this::transitionSlotToAllocated);
+
+       public DefaultSlotTracker(SlotStatusUpdateListener 
slotStatusUpdateListener) {

Review comment:
       Is it because now we need to introduce a factory to introduce a custom 
tracker into the `SlotManager`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Default SlotTracker implementation.
+ */
+class DefaultSlotTracker implements SlotTracker {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSlotTracker.class);
+
+       /**
+        * Map for all registered slots.
+        */
+       private final Map<SlotID, DeclarativeTaskManagerSlot> slots = new 
HashMap<>();
+
+       /**
+        * Index of all currently free slots.
+        */
+       private final Map<SlotID, DeclarativeTaskManagerSlot> freeSlots = new 
LinkedHashMap<>();
+
+       private final SlotStatusUpdateListener slotStatusUpdateListener;
+
+       private final SlotStatusStateReconciler slotStatusStateReconciler = new 
SlotStatusStateReconciler(this::transitionSlotToFree, 
this::transitionSlotToPending, this::transitionSlotToAllocated);
+
+       public DefaultSlotTracker(SlotStatusUpdateListener 
slotStatusUpdateListener) {
+               this.slotStatusUpdateListener = 
Preconditions.checkNotNull(slotStatusUpdateListener);
+       }
+
+       @Override
+       public void addSlot(
+               SlotID slotId,
+               ResourceProfile resourceProfile,
+               TaskExecutorConnection taskManagerConnection,
+               @Nullable JobID assignedJob) {
+               Preconditions.checkNotNull(slotId);
+               Preconditions.checkNotNull(resourceProfile);
+               Preconditions.checkNotNull(taskManagerConnection);
+
+               if (slots.containsKey(slotId)) {
+                       // remove the old slot first
+                       LOG.debug("A slot was added with an already tracked 
slot ID {}. Removing previous entry.", slotId);
+                       removeSlot(slotId);
+               }
+
+               DeclarativeTaskManagerSlot slot = new 
DeclarativeTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
+               slots.put(slotId, slot);
+               freeSlots.put(slotId, slot);
+               slotStatusStateReconciler.executeStateTransition(slot, 
assignedJob);
+       }
+
+       @Override
+       public void removeSlots(Iterable<SlotID> slotsToRemove) {
+               Preconditions.checkNotNull(slotsToRemove);
+
+               for (SlotID slotId : slotsToRemove) {
+                       removeSlot(slotId);
+               }
+       }
+
+       private void removeSlot(SlotID slotId) {
+               DeclarativeTaskManagerSlot slot = slots.remove(slotId);
+
+               if (slot != null) {
+                       if (slot.getState() != SlotState.FREE) {
+                               transitionSlotToFree(slot);
+                       }
+                       freeSlots.remove(slotId);
+               } else {
+                       LOG.debug("There was no slot registered with slot id 
{}.", slotId);
+               }
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+       // ResourceManager slot status API - optimistically trigger 
transitions, but they may not represent true state on task executors
+       // 
---------------------------------------------------------------------------------------------
+
+       @Override
+       public void notifyFree(SlotID slotId) {
+               Preconditions.checkNotNull(slotId);
+               transitionSlotToFree(slots.get(slotId));
+       }
+
+       @Override
+       public void notifyAllocationStart(SlotID slotId, JobID jobId) {
+               Preconditions.checkNotNull(slotId);
+               Preconditions.checkNotNull(jobId);
+               transitionSlotToPending(slots.get(slotId), jobId);
+       }
+
+       @Override
+       public void notifyAllocationComplete(SlotID slotId, JobID jobId) {
+               Preconditions.checkNotNull(slotId);
+               Preconditions.checkNotNull(jobId);
+               transitionSlotToAllocated(slots.get(slotId), jobId);
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+       // TaskExecutor slot status API - acts as source of truth
+       // 
---------------------------------------------------------------------------------------------
+
+       @Override
+       public void notifySlotStatus(Iterable<SlotStatus> slotStatuses) {
+               Preconditions.checkNotNull(slotStatuses);
+               for (SlotStatus slotStatus : slotStatuses) {
+                       
slotStatusStateReconciler.executeStateTransition(slots.get(slotStatus.getSlotID()),
 slotStatus.getJobID());
+               }
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+       // Core state transitions
+       // 
---------------------------------------------------------------------------------------------
+
+       private void transitionSlotToFree(DeclarativeTaskManagerSlot slot) {
+               Preconditions.checkNotNull(slot);
+               Preconditions.checkState(slot.getState() != SlotState.FREE);
+
+               // remember the slots current job and state for the 
notification, as this information will be cleared from
+               // the slot upon freeing
+               final JobID jobId = slot.getJobId();
+               final SlotState state = slot.getState();
+
+               slot.freeSlot();
+               freeSlots.put(slot.getSlotId(), slot);
+               slotStatusUpdateListener.notifySlotStatusChange(slot, state, 
SlotState.FREE, jobId);
+       }
+
+       private void transitionSlotToPending(DeclarativeTaskManagerSlot slot, 
JobID jobId) {
+               Preconditions.checkNotNull(slot);
+               Preconditions.checkState(slot.getState() == SlotState.FREE);
+
+               slot.startAllocation(jobId);
+               freeSlots.remove(slot.getSlotId());
+               slotStatusUpdateListener.notifySlotStatusChange(slot, 
SlotState.FREE, SlotState.PENDING, jobId);
+       }
+
+       private void transitionSlotToAllocated(DeclarativeTaskManagerSlot slot, 
JobID jobId) {
+               Preconditions.checkNotNull(slot);
+               Preconditions.checkState(slot.getState() == SlotState.PENDING);
+               Preconditions.checkState(jobId.equals(slot.getJobId()));
+
+               slotStatusUpdateListener.notifySlotStatusChange(slot, 
SlotState.PENDING, SlotState.ALLOCATED, jobId);
+               slot.completeAllocation();

Review comment:
       Does it make a difference whether 
`slotStatusUpdateListener.notifySlotStatusChange` is called before effectuating 
the state change on `slot`? I am asking because `transitionSlotToPending` and 
`transitionSlotToFree` do it in the different order.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Queue;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultSlotTracker}.
+ */
+public class DefaultSlotTrackerTest extends TestLogger {
+
+       private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION = 
new TaskExecutorConnection(
+               ResourceID.generate(),
+               new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
+
+       private static final JobID jobId = new JobID();
+
+       @Test
+       public void testInitialBehavior() {
+               SlotTracker tracker = new DefaultSlotTracker((slot, previous, 
current, jobId) -> {});
+
+               assertThat(tracker.getFreeSlots(), empty());
+       }
+
+       @Test
+       public void testSlotAddition() {
+               SlotTracker tracker = new DefaultSlotTracker((slot, previous, 
current, jobId) -> {});
+
+               SlotID slotId1 = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+               SlotID slotId2 = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1);
+
+               tracker.addSlot(slotId1, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
+               tracker.addSlot(slotId2, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
+
+               assertThat(tracker.getFreeSlots(), 
containsInAnyOrder(Arrays.asList(infoWithSlotId(slotId1), 
infoWithSlotId(slotId2))));
+       }
+
+       @Test
+       public void testSlotRemoval() {
+               Queue<SlotStateTransition> stateTransitions = new 
ArrayDeque<>();
+               DefaultSlotTracker tracker = new DefaultSlotTracker((slot, 
previous, current, jobId) ->
+                       stateTransitions.add(new 
SlotStateTransition(slot.getSlotId(), previous, current, jobId)));
+
+               SlotID slotId1 = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+               SlotID slotId2 = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1);
+               SlotID slotId3 = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 2);
+
+               tracker.addSlot(slotId1, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
+               tracker.addSlot(slotId2, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
+               tracker.addSlot(slotId3, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
+
+               tracker.notifyAllocationStart(slotId2, jobId);
+               tracker.notifyAllocationStart(slotId3, jobId);
+               tracker.notifyAllocationComplete(slotId3, jobId);
+
+               // the transitions to this point are not relevant for this test
+               stateTransitions.clear();
+               // we now have 1 slot in each slot state (free, pending, 
allocated)
+               // it should be possible to remove slots regardless of their 
state
+               tracker.removeSlots(Arrays.asList(slotId1, slotId2, slotId3));
+
+               assertThat(tracker.getFreeSlots(), empty());
+               assertThat(tracker.areMapsEmpty(), is(true));
+
+               assertThat(stateTransitions, containsInAnyOrder(
+                       new SlotStateTransition(slotId2, SlotState.PENDING, 
SlotState.FREE, jobId),
+                       new SlotStateTransition(slotId3, SlotState.ALLOCATED, 
SlotState.FREE, jobId)
+               ));
+       }
+
+       @Test
+       public void testAllocationCompletion() {
+               Queue<SlotStateTransition> stateTransitions = new 
ArrayDeque<>();
+               SlotTracker tracker = new DefaultSlotTracker((slot, previous, 
current, jobId) ->
+                       stateTransitions.add(new 
SlotStateTransition(slot.getSlotId(), previous, current, jobId)));
+
+               SlotID slotId = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+
+               tracker.addSlot(slotId, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
+
+               tracker.notifyAllocationStart(slotId, jobId);
+               assertThat(tracker.getFreeSlots(), empty());
+               assertThat(stateTransitions.remove(), is(new 
SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)));
+
+               tracker.notifyAllocationComplete(slotId, jobId);
+               assertThat(tracker.getFreeSlots(), empty());
+               assertThat(stateTransitions.remove(), is(new 
SlotStateTransition(slotId, SlotState.PENDING, SlotState.ALLOCATED, jobId)));
+
+               tracker.notifyFree(slotId);
+
+               assertThat(tracker.getFreeSlots(), 
contains(infoWithSlotId(slotId)));
+               assertThat(stateTransitions.remove(), is(new 
SlotStateTransition(slotId, SlotState.ALLOCATED, SlotState.FREE, jobId)));
+       }
+
+       @Test
+       public void 
testAllocationCompletionForDifferentJobThrowsIllegalStateException() {
+               SlotTracker tracker = new DefaultSlotTracker((slot, previous, 
current, jobId) -> {});
+
+               SlotID slotId = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+
+               tracker.addSlot(slotId, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
+
+               tracker.notifyAllocationStart(slotId, new JobID());
+               try {
+                       tracker.notifyAllocationComplete(slotId, new JobID());
+                       fail("Allocations must not be completed for a different 
job ID.");
+               } catch (IllegalStateException expected) {
+               }
+       }
+
+       @Test
+       public void testAllocationCancellation() {
+               Queue<SlotStateTransition> stateTransitions = new 
ArrayDeque<>();
+               SlotTracker tracker = new DefaultSlotTracker((slot, previous, 
current, jobId) ->
+                       stateTransitions.add(new 
SlotStateTransition(slot.getSlotId(), previous, current, jobId)));
+
+               SlotID slotId = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+
+               tracker.addSlot(slotId, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
+
+               tracker.notifyAllocationStart(slotId, jobId);
+               assertThat(tracker.getFreeSlots(), empty());
+               assertThat(stateTransitions.remove(), is(new 
SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)));
+
+               tracker.notifyFree(slotId);
+               assertThat(tracker.getFreeSlots(), 
contains(infoWithSlotId(slotId)));
+               assertThat(stateTransitions.remove(), is(new 
SlotStateTransition(slotId, SlotState.PENDING, SlotState.FREE, jobId)));
+       }
+
+       @Test
+       public void testSlotStatusProcessing() {
+               SlotTracker tracker = new DefaultSlotTracker((slot, previous, 
current, jobId) -> {});
+               SlotID slotId1 = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
+               SlotID slotId2 = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 1);
+               SlotID slotId3 = new 
SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 2);
+               tracker.addSlot(slotId1, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
+               tracker.addSlot(slotId2, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, null);
+               tracker.addSlot(slotId3, ResourceProfile.ANY, 
TASK_EXECUTOR_CONNECTION, jobId);
+
+               assertThat(tracker.getFreeSlots(), 
containsInAnyOrder(Arrays.asList(infoWithSlotId(slotId1), 
infoWithSlotId(slotId2))));
+
+               // move slot2 to PENDING
+               tracker.notifyAllocationStart(slotId2, jobId);
+
+               tracker.notifySlotStatus(Arrays.asList(
+                       new SlotStatus(slotId1, ResourceProfile.ANY, jobId, new 
AllocationID()),
+                       new SlotStatus(slotId2, ResourceProfile.ANY, null, new 
AllocationID()),
+                       new SlotStatus(slotId3, ResourceProfile.ANY, null, new 
AllocationID())));
+
+               // slot1 should now be allocated; slot2 should continue to be 
in a pending state; slot3 should be freed
+               assertThat(tracker.getFreeSlots(), 
contains(infoWithSlotId(slotId3)));
+
+               // if slot2 is not in a pending state, this will fail with an 
exception
+               tracker.notifyAllocationComplete(slotId2, jobId);
+       }
+
+       /**
+        * Tests all state transitions that could (or should not) occur due to 
a slot status update. This test only checks
+        * the target state and job ID for state transitions, because the slot 
ID is not interesting and the slot state
+        * is not *actually* being updated. We assume the reconciler locks in a 
set of transitions given a source and target
+        * state, without worrying about the correctness of intermediate steps 
(because it shouldn't; and it would be a bit
+        * annoying to setup).
+        */
+       @Test
+       public void testSlotStatusReconciliation() {
+               JobID jobId1 = new JobID();
+               JobID jobId2 = new JobID();
+
+               Queue<SlotStateTransition> stateTransitions = new 
ArrayDeque<>();
+
+               DefaultSlotTracker.SlotStatusStateReconciler reconciler = new 
DefaultSlotTracker.SlotStatusStateReconciler(
+                       slot -> stateTransitions.add(new 
SlotStateTransition(slot.getSlotId(), slot.getState(), SlotState.FREE, 
slot.getJobId())),
+                       (slot, jobID) -> stateTransitions.add(new 
SlotStateTransition(slot.getSlotId(), slot.getState(), SlotState.PENDING, 
jobID)),
+                       (slot, jobID) -> stateTransitions.add(new 
SlotStateTransition(slot.getSlotId(), slot.getState(), SlotState.ALLOCATED, 
jobID)));
+
+               {// free slot
+                       DeclarativeTaskManagerSlot slot = new 
DeclarativeTaskManagerSlot(new SlotID(ResourceID.generate(), 0), 
ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION);
+
+                       reconciler.executeStateTransition(slot, null);
+                       assertThat(stateTransitions, empty());
+
+                       reconciler.executeStateTransition(slot, jobId1);
+                       assertThat(stateTransitions.remove(), 
is(transitionWithTargetStateForJob(SlotState.PENDING, jobId1)));
+                       assertThat(stateTransitions.remove(), 
is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId1)));
+               }

Review comment:
       I think I would split these blocks into separate test methods.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Queue;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultSlotTracker}.
+ */
+public class DefaultSlotTrackerTest extends TestLogger {
+
+       private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION = 
new TaskExecutorConnection(
+               ResourceID.generate(),
+               new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
+
+       private static final JobID jobId = new JobID();
+
+       @Test
+       public void testInitialBehavior() {

Review comment:
       maybe spell out what the initial behaviour is. For someone not having 
written this code, this might not be obvious.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to