xintongsong commented on a change in pull request #14647:
URL: https://github.com/apache/flink/pull/14647#discussion_r560763453



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorInfo.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+/** Basic resource information of a TaskExecutor. */
+public interface TaskExecutorInfo {

Review comment:
       I think the class name is a bit too general.
   Maybe `TaskManagerResourceView` or `TaskManagerResourceStatus` to indicate 
1) it's about resource and 2) it provides a view of the a changeable status.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -111,6 +111,19 @@
                     .withDescription(
                             "Defines whether the cluster uses declarative 
resource management.");
 
+    @Documentation.ExcludeFromDocumentation
+    public static final ConfigOption<Boolean> 
ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT =
+            
ConfigOptions.key("cluster.fine-grained-resource-management.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Defines whether the cluster uses fine-grained 
resource management.");
+
+    public static boolean isFineGrainedResourceManagementEnabled(Configuration 
configuration) {

Review comment:
       nit: call me captious, but maybe keep the same order between 
fine-grained and declarative for the config options and methods. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener 
slotStatusUpdateListener);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile 
defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+

Review comment:
       Should there be a method for removing a pending task executor? Say the 
worker is not registered and is no longer needed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener 
slotStatusUpdateListener);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile 
defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the 
matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID 
allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Utility method
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the instance ids of all registered task executors.
+     *
+     * @return a set of instance ids of all registered task executors.
+     */
+    Set<InstanceID> getTaskExecutors();
+
+    /**
+     * Check if there is a registered task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     * @return whether there is a registered task executor with the given 
instance id
+     */
+    boolean isTaskExecutorRegistered(InstanceID instanceId);
+
+    /**
+     * Find an exactly matching pending task executor with the given resource 
profile.
+     *
+     * @param initialSlotReport of the task executor
+     * @param totalResourceProfile of the task executor
+     * @param defaultResourceProfile of the task executor
+     * @return An Optional of {@link PendingTaskManager}, if find, of the 
matching pending task
+     *     executor.
+     */
+    Optional<PendingTaskManager> findMatchingPendingTaskManager(
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultResourceProfile);
+
+    // 
---------------------------------------------------------------------------------------------
+    // TaskExecutor idleness / redundancy
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Get all task executors which idle exceed the given timeout period
+     *
+     * @param taskManagerTimeout timeout period
+     * @return a map of timeout task executors' connection index by the 
instance id
+     */
+    Map<InstanceID, TaskExecutorConnection> getTimeOutTaskExecutors(Time 
taskManagerTimeout);

Review comment:
       This method should not belong to this interface.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorAllocationStrategy.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.slots.ResourceCounter;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Strategy how to allocate new task executors to fulfill the unfulfilled 
requirements. */
+public interface TaskExecutorAllocationStrategy {
+    TaskExecutorAllocationStrategy NO_OP_STRATEGY =
+            (requirements, existingPendingResources) -> Collections.emptyMap();
+
+    /**
+     * Calculate {@link PendingTaskManager}s needed to fulfill the given 
requirements.
+     *
+     * @param requirements requirements indexed by jobId
+     * @param existingPendingResources existing pending resources can be used 
to fulfill requirement
+     * @return {@link PendingTaskManager}s needed and whether all the 
requirements can be fulfilled
+     *     after allocation of pending task executors, indexed by jobId
+     */
+    Map<JobID, Tuple2<Map<PendingTaskManager, Integer>, Boolean>> 
getTaskExecutorsToFulfill(

Review comment:
       How do we guarantee `TaskExecutorMatchingStrategy` matches slot to 
pending TMs in the same way `TaskExecutorAllocationStrategy` calculates needed 
TMs?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtils.java
##########
@@ -0,0 +1,36 @@
+/*

Review comment:
       The commit message is a bit misleading.
   It sounds like a new util class is implemented, which should not be a 
hotfix, while actually the commit only moves some existing codes for 
deduplication.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** {@link TaskExecutorMatchingStrategy} which picks the first matching task 
executor. */
+public enum AnyMatchingTaskExecutorMatchingStrategy implements 
TaskExecutorMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Optional<InstanceID> findMatchingTaskExecutor(
+            ResourceProfile requirement,
+            Map<InstanceID, ? extends TaskExecutorInfo> taskExecutors) {
+        return taskExecutors.entrySet().stream()
+                .filter(taskExecutor -> canFulfillRequirement(requirement, 
taskExecutor.getValue()))
+                .findFirst()
+                .map(Map.Entry::getKey);
+    }
+
+    private boolean canFulfillRequirement(

Review comment:
       can be static

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {

Review comment:
       My gut feeling is that, this interface contains too many methods, and 
not all of them are closely related. They could be further grouped and 
structured.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
##########
@@ -28,6 +30,10 @@
 
     SlotID getSlotId();
 
+    AllocationID getAllocationId();
+
+    JobID getJobId();

Review comment:
       These two should be marked `@Nullable`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategyTest.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the {@link AnyMatchingTaskExecutorMatchingStrategy}. */
+public class AnyMatchingTaskExecutorMatchingStrategyTest extends TestLogger {
+    private final InstanceID instanceIdOfLargeTaskExecutor = new InstanceID();
+    private final InstanceID instanceIdOfSmallTaskExecutor = new InstanceID();
+    private final ResourceProfile largeResourceProfile = 
ResourceProfile.fromResources(10.2, 42);
+    private final ResourceProfile smallResourceProfile = 
ResourceProfile.fromResources(1, 1);

Review comment:
       static

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener 
slotStatusUpdateListener);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile 
defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the 
matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(

Review comment:
       This method should not belong to this interface.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
##########
@@ -134,11 +134,13 @@ public static TaskExecutorResourceSpec 
resourceSpecFromConfigForLocalExecution(
     public static Configuration adjustForLocalExecution(Configuration config) {
         UNUSED_CONFIG_OPTIONS.forEach(option -> 
warnOptionHasNoEffectIfSet(config, option));
 
-        setConfigOptionToPassedMaxIfNotSet(config, 
TaskManagerOptions.CPU_CORES, Double.MAX_VALUE);
+        setConfigOptionToPassedMaxIfNotSet(config, 
TaskManagerOptions.CPU_CORES, 1000000.0);
         setConfigOptionToPassedMaxIfNotSet(
-                config, TaskManagerOptions.TASK_HEAP_MEMORY, 
MemorySize.MAX_VALUE);
+                config, TaskManagerOptions.TASK_HEAP_MEMORY, 
MemorySize.ofMebiBytes(1024 * 1024));
         setConfigOptionToPassedMaxIfNotSet(
-                config, TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
MemorySize.MAX_VALUE);
+                config,
+                TaskManagerOptions.TASK_OFF_HEAP_MEMORY,
+                MemorySize.ofMebiBytes(1024 * 1024));

Review comment:
       Let's replace the magic numbers with static constants.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -111,6 +111,19 @@
                     .withDescription(
                             "Defines whether the cluster uses declarative 
resource management.");
 
+    @Documentation.ExcludeFromDocumentation
+    public static final ConfigOption<Boolean> 
ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT =
+            
ConfigOptions.key("cluster.fine-grained-resource-management.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Defines whether the cluster uses fine-grained 
resource management.");
+
+    public static boolean isFineGrainedResourceManagementEnabled(Configuration 
configuration) {
+        return isDeclarativeResourceManagementEnabled(configuration)

Review comment:
       IIUC, we need to bind fine-grained with declarative because in the first 
step we implement the feature base on the declarative protocol, which also 
requires declarative slot pool being used. And once FLINK-20838 is finished, we 
would be able to support both protocols and no longer need this binding.
   
   If this is the case, I would suggest to add a `TODO` here explaining why 
this is temporary needed and when should be removed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener 
slotStatusUpdateListener);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile 
defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the 
matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID 
allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Utility method
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the instance ids of all registered task executors.
+     *
+     * @return a set of instance ids of all registered task executors.
+     */
+    Set<InstanceID> getTaskExecutors();
+
+    /**
+     * Check if there is a registered task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     * @return whether there is a registered task executor with the given 
instance id
+     */
+    boolean isTaskExecutorRegistered(InstanceID instanceId);
+
+    /**
+     * Find an exactly matching pending task executor with the given resource 
profile.
+     *
+     * @param initialSlotReport of the task executor
+     * @param totalResourceProfile of the task executor
+     * @param defaultResourceProfile of the task executor
+     * @return An Optional of {@link PendingTaskManager}, if find, of the 
matching pending task
+     *     executor.
+     */
+    Optional<PendingTaskManager> findMatchingPendingTaskManager(
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultResourceProfile);
+
+    // 
---------------------------------------------------------------------------------------------
+    // TaskExecutor idleness / redundancy
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Get all task executors which idle exceed the given timeout period
+     *
+     * @param taskManagerTimeout timeout period
+     * @return a map of timeout task executors' connection index by the 
instance id
+     */
+    Map<InstanceID, TaskExecutorConnection> getTimeOutTaskExecutors(Time 
taskManagerTimeout);
+
+    /**
+     * Get the start time of idleness of the task executor with the given 
instance id.
+     *
+     * @param instanceId of the task executor
+     * @return the start time of idleness
+     */
+    long getTaskExecutorIdleSince(InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // slot / resource counts
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the total registered resources.
+     *
+     * @return the total registered resources
+     */
+    ResourceProfile getTotalRegisteredResources();
+
+    /**
+     * Get the total registered resources of the given task executor
+     *
+     * @param instanceId of the task executor
+     * @return the total registered resources of the given task executor
+     */
+    ResourceProfile getTotalRegisteredResourcesOf(InstanceID instanceId);

Review comment:
       The following methods can be aggregated to something like `TMStatus 
getTMStatus(InstanceID instanceId)`, where `TMStatus` (maybe another name) is a 
data structure containing all information needed.
   - ResourceProfile getTotalRegisteredResourcesOf(InstanceID instanceId)
   - ResourceProfile getTotalFreeResourcesOf(InstanceID instanceId)
   - int getNumberRegisteredSlotsOf(InstanceID instanceId)
   - int getNumberFreeSlotsOf(InstanceID instanceId)
   - long getTaskExecutorIdleSince(InstanceID instanceId)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskExecutorInfo.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+public class TestingTaskExecutorInfo implements TaskExecutorInfo {
+    private final ResourceProfile totalResource;
+    private final ResourceProfile availableResource;
+    private final ResourceProfile defaultSlotResourceProfile;
+    private final int numberAllocatedSlots;
+
+    private TestingTaskExecutorInfo(
+            ResourceProfile totalResource,
+            ResourceProfile availableResource,
+            ResourceProfile defaultSlotResourceProfile,
+            int numberAllocatedSlots) {
+        this.totalResource = totalResource;
+        this.availableResource = availableResource;
+        this.defaultSlotResourceProfile = defaultSlotResourceProfile;
+        this.numberAllocatedSlots = numberAllocatedSlots;
+    }
+
+    @Override
+    public ResourceProfile getTotalResource() {
+        return totalResource;
+    }
+
+    @Override
+    public ResourceProfile getDefaultSlotResourceProfile() {
+        return defaultSlotResourceProfile;
+    }
+
+    @Override
+    public ResourceProfile getAvailableResource() {
+        return availableResource;
+    }
+
+    @Override
+    public int getNumberAllocatedSlots() {
+        return numberAllocatedSlots;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private ResourceProfile totalResource = ResourceProfile.ANY;
+        private ResourceProfile availableResource = ResourceProfile.ANY;
+        private ResourceProfile defaultSlotResourceProfile = 
ResourceProfile.ANY;
+        private int numberAllocatedSlots = 0;
+
+        private Builder() {};
+
+        public Builder withTotalResource(ResourceProfile totalResource) {
+            this.totalResource = totalResource;
+            return this;
+        }
+
+        public Builder withAvailableResource(ResourceProfile 
availableResource) {
+            this.availableResource = availableResource;
+            return this;
+        }
+
+        public Builder withDefaultSlotResourceProfile(ResourceProfile 
defaultSlotResourceProfile) {
+            this.defaultSlotResourceProfile = defaultSlotResourceProfile;
+            return this;
+        }
+
+        public Builder withNumberAllocatedSlots(int numberAllocatedSlots) {
+            this.numberAllocatedSlots = numberAllocatedSlots;
+            return this;
+        }

Review comment:
       Better to add sanity checks for the setters:
   - non-null
   - available <= total
   - default <= total

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusUpdateListener.java
##########
@@ -42,4 +45,24 @@
      */
     void notifySlotStatusChange(
             TaskManagerSlotInformation slot, SlotState previous, SlotState 
current, JobID jobId);
+
+    class MultiSlotStatusUpdateListener implements SlotStatusUpdateListener {

Review comment:
       IIUC, this change is to allow `MultiSlotStatusUpdateListener` to be 
reused by other classes than `DefaultSlotTracker`.
   Then I would suggest to make `MultiSlotStatusUpdateListener` a separate 
file. An interface does not normally have an inner class that implements 
itself. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Strategy how to find a matching task executor. */
+public interface TaskExecutorMatchingStrategy {
+
+    /**
+     * Finds a matching task executor for the requested {@link 
ResourceProfile} given the collection
+     * of task executors.
+     *
+     * @param requirement to find a matching task executor for
+     * @param taskExecutors candidates
+     * @return Returns the instance id of matching task executor or {@link 
Optional#empty()} if
+     *     there is none
+     */
+    Optional<InstanceID> findMatchingTaskExecutor(
+            ResourceProfile requirement, Map<InstanceID, ? extends 
TaskExecutorInfo> taskExecutors);

Review comment:
       Why do we need a map for this interface? We should not need to lookup a 
`TaskExecutorInfo` by its `InstanceID` in the strategy.
   It might be better to include `InstanceID` in `TaskExecutorInfo` and pass in 
a list or collection.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategyTest.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the {@link AnyMatchingTaskExecutorMatchingStrategy}. */
+public class AnyMatchingTaskExecutorMatchingStrategyTest extends TestLogger {
+    private final InstanceID instanceIdOfLargeTaskExecutor = new InstanceID();
+    private final InstanceID instanceIdOfSmallTaskExecutor = new InstanceID();
+    private final ResourceProfile largeResourceProfile = 
ResourceProfile.fromResources(10.2, 42);
+    private final ResourceProfile smallResourceProfile = 
ResourceProfile.fromResources(1, 1);
+    private Map<InstanceID, TaskExecutorInfo> taskExecutors = null;
+    private Set<InstanceID> candidates = null;
+
+    @Before
+    public void setup() {
+        taskExecutors = new HashMap<>();
+        candidates = new HashSet<>();
+        taskExecutors.put(
+                instanceIdOfSmallTaskExecutor,
+                TestingTaskExecutorInfo.newBuilder()
+                        .withAvailableResource(smallResourceProfile)
+                        .build());
+        taskExecutors.put(
+                instanceIdOfLargeTaskExecutor,
+                TestingTaskExecutorInfo.newBuilder()
+                        .withAvailableResource(largeResourceProfile)
+                        .build());

Review comment:
       Why do we need this `setup()`? We can also make `taskExecutors` a 
constant.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTracker.java
##########
@@ -278,30 +277,4 @@ public void 
executeStateTransition(DeclarativeTaskManagerSlot slot, JobID jobId)
             }
         }
     }
-
-    private static class MultiSlotStatusUpdateListener implements 
SlotStatusUpdateListener {

Review comment:
       This should also be a hotfix commit.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** {@link TaskExecutorMatchingStrategy} which picks the first matching task 
executor. */
+public enum AnyMatchingTaskExecutorMatchingStrategy implements 
TaskExecutorMatchingStrategy {
+    INSTANCE;
+
+    @Override
+    public Optional<InstanceID> findMatchingTaskExecutor(
+            ResourceProfile requirement,
+            Map<InstanceID, ? extends TaskExecutorInfo> taskExecutors) {
+        return taskExecutors.entrySet().stream()
+                .filter(taskExecutor -> canFulfillRequirement(requirement, 
taskExecutor.getValue()))
+                .findFirst()

Review comment:
       `findFirst` -> `findAny`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastAllocateSlotsTaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * {@link TaskExecutorMatchingStrategy} which picks a matching TaskExecutor 
with the least number of
+ * allocated slots.
+ */
+public enum LeastAllocateSlotsTaskExecutorMatchingStrategy implements 
TaskExecutorMatchingStrategy {

Review comment:
       I believe this is for the configuration option 
`cluster.evenly-spread-out-slots`.
   
   However, I'm not sure how we want this feature for the fine-grained resource 
management, where amount of resource plays a more important role than number of 
slots.
   
   This may need to revisit how this feature should be exposed, to be 
compatible for both coarse/fine-grained resource management. Given that this PR 
is already quite big, I would suggest to not introduce this strategy in this 
PR, but create a follow-up issue instead.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategyTest.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the {@link AnyMatchingTaskExecutorMatchingStrategy}. */
+public class AnyMatchingTaskExecutorMatchingStrategyTest extends TestLogger {
+    private final InstanceID instanceIdOfLargeTaskExecutor = new InstanceID();
+    private final InstanceID instanceIdOfSmallTaskExecutor = new InstanceID();
+    private final ResourceProfile largeResourceProfile = 
ResourceProfile.fromResources(10.2, 42);
+    private final ResourceProfile smallResourceProfile = 
ResourceProfile.fromResources(1, 1);
+    private Map<InstanceID, TaskExecutorInfo> taskExecutors = null;
+    private Set<InstanceID> candidates = null;

Review comment:
       Never used.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
##########
@@ -134,11 +134,13 @@ public static TaskExecutorResourceSpec 
resourceSpecFromConfigForLocalExecution(
     public static Configuration adjustForLocalExecution(Configuration config) {
         UNUSED_CONFIG_OPTIONS.forEach(option -> 
warnOptionHasNoEffectIfSet(config, option));
 
-        setConfigOptionToPassedMaxIfNotSet(config, 
TaskManagerOptions.CPU_CORES, Double.MAX_VALUE);
+        setConfigOptionToPassedMaxIfNotSet(config, 
TaskManagerOptions.CPU_CORES, 1000000.0);
         setConfigOptionToPassedMaxIfNotSet(
-                config, TaskManagerOptions.TASK_HEAP_MEMORY, 
MemorySize.MAX_VALUE);
+                config, TaskManagerOptions.TASK_HEAP_MEMORY, 
MemorySize.ofMebiBytes(1024 * 1024));
         setConfigOptionToPassedMaxIfNotSet(
-                config, TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
MemorySize.MAX_VALUE);
+                config,
+                TaskManagerOptions.TASK_OFF_HEAP_MEMORY,
+                MemorySize.ofMebiBytes(1024 * 1024));

Review comment:
       And it would be helpful to explain the purpose of this change in the 
commit message.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener 
slotStatusUpdateListener);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile 
defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the 
matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID 
allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Utility method
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the instance ids of all registered task executors.
+     *
+     * @return a set of instance ids of all registered task executors.
+     */
+    Set<InstanceID> getTaskExecutors();
+
+    /**
+     * Check if there is a registered task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     * @return whether there is a registered task executor with the given 
instance id
+     */
+    boolean isTaskExecutorRegistered(InstanceID instanceId);
+
+    /**
+     * Find an exactly matching pending task executor with the given resource 
profile.
+     *
+     * @param initialSlotReport of the task executor
+     * @param totalResourceProfile of the task executor
+     * @param defaultResourceProfile of the task executor
+     * @return An Optional of {@link PendingTaskManager}, if find, of the 
matching pending task
+     *     executor.
+     */
+    Optional<PendingTaskManager> findMatchingPendingTaskManager(

Review comment:
       This method should not belong to this interface.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##########
@@ -70,6 +71,16 @@ public static WorkerResourceSpec fromTaskExecutorProcessSpec(
                 taskExecutorProcessSpec.getManagedMemorySize());
     }
 
+    public static WorkerResourceSpec fromResourceProfile(final ResourceProfile 
resourceProfile) {

Review comment:
       Again, should explain the purpose of this change in the commit message?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Strategy how to find a matching task executor. */
+public interface TaskExecutorMatchingStrategy {

Review comment:
       I would suggest `SlotTaskExecutorMatchingStrategy` to suggest that the 
strategy is for matching between a slot and a TM rather than two TMs.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/AnyMatchingTaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** {@link TaskExecutorMatchingStrategy} which picks the first matching task 
executor. */
+public enum AnyMatchingTaskExecutorMatchingStrategy implements 
TaskExecutorMatchingStrategy {

Review comment:
       JavaDoc: first -> any

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener 
slotStatusUpdateListener);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile 
defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the 
matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID 
allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);

Review comment:
       The slot status synchronization logics are a bit too complicate to be 
included in this interface. Might be better to have a dedicated slot status 
synchronizer. The synchronizer can take slot manager actions (allocate, free), 
received slot reports, and the current status as inputs, and notify the 
`TaskExecutorTracker` and `SlotStatusUpdateListener` as output. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##########
@@ -70,6 +71,16 @@ public static WorkerResourceSpec fromTaskExecutorProcessSpec(
                 taskExecutorProcessSpec.getManagedMemorySize());
     }
 
+    public static WorkerResourceSpec fromResourceProfile(final ResourceProfile 
resourceProfile) {

Review comment:
       To avoid confusion against slot resource profile.
   ```suggestion
       public static WorkerResourceSpec fromTotalResourceProfile(final 
ResourceProfile totalResourceProfile) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener 
slotStatusUpdateListener);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile 
defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the 
matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID 
allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Utility method
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the instance ids of all registered task executors.
+     *
+     * @return a set of instance ids of all registered task executors.
+     */
+    Set<InstanceID> getTaskExecutors();

Review comment:
       Maybe return a collection of `TMInfo` rather than only `InstanceID`s.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java
##########
@@ -28,6 +30,10 @@
 
     SlotID getSlotId();
 
+    AllocationID getAllocationId();
+
+    JobID getJobId();

Review comment:
       And this looks like a pure minor refactor. Shouldn't it be a hotfix 
commit?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorTracker.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Tracks TaskExecutor's resource and slot status. */
+interface TaskExecutorTracker {
+
+    /**
+     * Registers the given listener with this tracker.
+     *
+     * @param slotStatusUpdateListener listener to register
+     */
+    void registerSlotStatusUpdateListener(SlotStatusUpdateListener 
slotStatusUpdateListener);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Add / Remove (pending) Resource
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Register a new task executor with its initial slot report.
+     *
+     * @param taskExecutorConnection of the new task executor
+     * @param initialSlotReport of the new task executor
+     * @param totalResourceProfile of the new task executor
+     * @param defaultSlotResourceProfile of the new task executor
+     */
+    void addTaskExecutor(
+            TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);
+
+    /**
+     * Add a new pending task executor with its resource profile.
+     *
+     * @param totalResourceProfile of the pending task executor
+     * @param defaultSlotResourceProfile of the pending task executor
+     */
+    void addPendingTaskExecutor(
+            ResourceProfile totalResourceProfile, ResourceProfile 
defaultSlotResourceProfile);
+
+    /**
+     * Unregister a task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     */
+    void removeTaskExecutor(InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Slot status updates
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Find a task executor to fulfill the given resource requirement.
+     *
+     * @param resourceProfile of the given requirement
+     * @param taskExecutorMatchingStrategy to use
+     * @return An Optional of {@link TaskExecutorConnection}, if find, of the 
matching task
+     *     executor.
+     */
+    Optional<TaskExecutorConnection> findTaskExecutorToFulfill(
+            ResourceProfile resourceProfile,
+            TaskExecutorMatchingStrategy taskExecutorMatchingStrategy);
+
+    /**
+     * Notify the allocation of the given slot is started.
+     *
+     * @param allocationId of the allocated slot
+     * @param jobId of the allocated slot
+     * @param instanceId of the located task executor
+     * @param requiredResource of the allocated slot
+     */
+    void notifyAllocationStart(
+            AllocationID allocationId,
+            JobID jobId,
+            InstanceID instanceId,
+            ResourceProfile requiredResource);
+
+    /**
+     * Notify the allocation of the given slot is completed.
+     *
+     * @param instanceId of the located task executor
+     * @param allocationId of the allocated slot
+     */
+    void notifyAllocationComplete(InstanceID instanceId, AllocationID 
allocationId);
+
+    /**
+     * Notify the given allocated slot is free.
+     *
+     * @param allocationId of the allocated slot
+     */
+    void notifyFree(AllocationID allocationId);
+
+    /**
+     * Notifies the tracker about the slot statuses.
+     *
+     * @param slotReport slot report
+     * @param instanceId of the task executor
+     */
+    void notifySlotStatus(SlotReport slotReport, InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // Utility method
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the instance ids of all registered task executors.
+     *
+     * @return a set of instance ids of all registered task executors.
+     */
+    Set<InstanceID> getTaskExecutors();
+
+    /**
+     * Check if there is a registered task executor with the given instance id.
+     *
+     * @param instanceId of the task executor
+     * @return whether there is a registered task executor with the given 
instance id
+     */
+    boolean isTaskExecutorRegistered(InstanceID instanceId);
+
+    /**
+     * Find an exactly matching pending task executor with the given resource 
profile.
+     *
+     * @param initialSlotReport of the task executor
+     * @param totalResourceProfile of the task executor
+     * @param defaultResourceProfile of the task executor
+     * @return An Optional of {@link PendingTaskManager}, if find, of the 
matching pending task
+     *     executor.
+     */
+    Optional<PendingTaskManager> findMatchingPendingTaskManager(
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultResourceProfile);
+
+    // 
---------------------------------------------------------------------------------------------
+    // TaskExecutor idleness / redundancy
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Get all task executors which idle exceed the given timeout period
+     *
+     * @param taskManagerTimeout timeout period
+     * @return a map of timeout task executors' connection index by the 
instance id
+     */
+    Map<InstanceID, TaskExecutorConnection> getTimeOutTaskExecutors(Time 
taskManagerTimeout);
+
+    /**
+     * Get the start time of idleness of the task executor with the given 
instance id.
+     *
+     * @param instanceId of the task executor
+     * @return the start time of idleness
+     */
+    long getTaskExecutorIdleSince(InstanceID instanceId);
+
+    // 
---------------------------------------------------------------------------------------------
+    // slot / resource counts
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Get the total registered resources.
+     *
+     * @return the total registered resources
+     */
+    ResourceProfile getTotalRegisteredResources();

Review comment:
       The following methods can be aggregated to something like 
`StatusOverview getStatusOverview()`, where `StatusOverview` (maybe another 
name) is a data structure containing all information needed.
   - ResourceProfile getTotalRegisteredResources()
   - ResourceProfile getTotalFreeResources()
   - int getNumberRegisteredSlots()
   - int getNumberFreeSlots()
   - int getNumberFreeTaskExecutors()
   - int getNumberRegisteredTaskExecutors()
   - int getNumberPendingTaskExecutors();

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorMatchingStrategy.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Strategy how to find a matching task executor. */
+public interface TaskExecutorMatchingStrategy {

Review comment:
       Moreover, the latest glossary no longer use the term `TaskExecutor`.
   It's not necessary to change the existing codes, but introducing new codes I 
would suggest to replace `TaskExecutor` with `TaskManager`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to