tillrohrmann commented on a change in pull request #13553:
URL: https://github.com/apache/flink/pull/13553#discussion_r502527695



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.slots.ResourceCounter;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link SlotManager} supporting declarative slot 
management.
+ */
+public class DeclarativeSlotManager implements SlotManager {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeSlotManager.class);
+
+       private final SlotTracker slotTracker;
+       private final ResourceTracker resourceTracker;
+       private final BiFunction<Executor, ResourceActions, 
TaskExecutorManager> taskExecutorManagerFactory;
+       private TaskExecutorManager taskExecutorManager;
+
+       /** Timeout for slot requests to the task manager. */
+       private final Time taskManagerRequestTimeout;
+       private ScheduledFuture<?> slotRequestTimeoutCheck;
+
+       private final SlotMatchingStrategy slotMatchingStrategy;
+
+       private final SlotManagerMetricGroup slotManagerMetricGroup;
+
+       private final Map<JobID, String> jobMasterTargetAddresses = new 
HashMap<>();
+       private final HashMap<SlotID, CompletableFuture<Acknowledge>> 
pendingSlotAllocationFutures;
+
+       /** ResourceManager's id. */
+       private ResourceManagerId resourceManagerId;
+
+       /** Executor for future callbacks which have to be "synchronized". */
+       private Executor mainThreadExecutor;
+
+       /** Callbacks for resource (de-)allocations. */
+       private ResourceActions resourceActions;
+
+       /** True iff the component has been started. */
+       private boolean started;
+
+       public DeclarativeSlotManager(
+                       ScheduledExecutor scheduledExecutor,
+                       SlotManagerConfiguration slotManagerConfiguration,
+                       SlotManagerMetricGroup slotManagerMetricGroup,
+                       ResourceTracker resourceTracker,
+                       SlotTracker slotTracker) {
+
+               Preconditions.checkNotNull(slotManagerConfiguration);
+               this.taskManagerRequestTimeout = 
slotManagerConfiguration.getTaskManagerRequestTimeout();
+               this.slotManagerMetricGroup = 
Preconditions.checkNotNull(slotManagerMetricGroup);
+               this.resourceTracker = 
Preconditions.checkNotNull(resourceTracker);
+
+               pendingSlotAllocationFutures = new HashMap<>(16);
+
+               this.slotTracker = Preconditions.checkNotNull(slotTracker);
+               
slotTracker.registerSlotStatusUpdateListener(createSlotStatusUpdateListener());
+
+               slotMatchingStrategy = 
slotManagerConfiguration.getSlotMatchingStrategy();
+
+               taskExecutorManagerFactory = (executor, resourceActions) -> new 
TaskExecutorManager(
+                       slotManagerConfiguration.getDefaultWorkerResourceSpec(),
+                       slotManagerConfiguration.getNumSlotsPerWorker(),
+                       slotManagerConfiguration.getMaxSlotNum(),
+                       
slotManagerConfiguration.isWaitResultConsumedBeforeRelease(),
+                       slotManagerConfiguration.getRedundantTaskManagerNum(),
+                       slotManagerConfiguration.getTaskManagerTimeout(),
+                       scheduledExecutor,
+                       executor,
+                       resourceActions);
+
+               resourceManagerId = null;
+               resourceActions = null;
+               mainThreadExecutor = null;
+               slotRequestTimeoutCheck = null;
+               taskExecutorManager = null;
+
+               started = false;
+       }
+
+       private SlotStatusUpdateListener createSlotStatusUpdateListener() {
+               return (taskManagerSlot, previous, current, jobId) -> {
+                       if (previous == SlotState.PENDING) {
+                               
cancelAllocationFuture(taskManagerSlot.getSlotId());
+                       }
+
+                       if (current == SlotState.PENDING) {
+                               resourceTracker.notifyAcquiredResource(jobId, 
taskManagerSlot.getResourceProfile());
+                       }
+                       if (current == SlotState.FREE) {
+                               resourceTracker.notifyLostResource(jobId, 
taskManagerSlot.getResourceProfile());
+                       }
+
+                       if (current == SlotState.ALLOCATED) {
+                               
taskExecutorManager.occupySlot(taskManagerSlot.getInstanceId());
+                       }
+                       if (previous == SlotState.ALLOCATED && current == 
SlotState.FREE) {
+                               
taskExecutorManager.freeSlot(taskManagerSlot.getInstanceId());
+                       }
+               };
+       }
+
+       private void cancelAllocationFuture(SlotID slotId) {
+               final CompletableFuture<Acknowledge> 
acknowledgeCompletableFuture = pendingSlotAllocationFutures.remove(slotId);
+               // the future may be null if we are just re-playing the state 
transitions due to a slot report
+               if (acknowledgeCompletableFuture != null) {
+                       acknowledgeCompletableFuture.cancel(false);
+               }
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+       // Component lifecycle methods
+       // 
---------------------------------------------------------------------------------------------
+
+       /**
+        * Starts the slot manager with the given leader id and resource 
manager actions.
+        *
+        * @param newResourceManagerId to use for communication with the task 
managers
+        * @param newMainThreadExecutor to use to run code in the 
ResourceManager's main thread
+        * @param newResourceActions to use for resource (de-)allocations
+        */
+       @Override
+       public void start(ResourceManagerId newResourceManagerId, Executor 
newMainThreadExecutor, ResourceActions newResourceActions) {
+               LOG.info("Starting the slot manager.");
+
+               this.resourceManagerId = 
Preconditions.checkNotNull(newResourceManagerId);
+               mainThreadExecutor = 
Preconditions.checkNotNull(newMainThreadExecutor);
+               resourceActions = 
Preconditions.checkNotNull(newResourceActions);
+               taskExecutorManager = 
taskExecutorManagerFactory.apply(newMainThreadExecutor, newResourceActions);
+
+               started = true;
+
+               registerSlotManagerMetrics();
+       }
+
+       private void registerSlotManagerMetrics() {
+               slotManagerMetricGroup.gauge(
+                       MetricNames.TASK_SLOTS_AVAILABLE,
+                       () -> (long) getNumberFreeSlots());
+               slotManagerMetricGroup.gauge(
+                       MetricNames.TASK_SLOTS_TOTAL,
+                       () -> (long) getNumberRegisteredSlots());
+       }

Review comment:
       I guess the same problem already exists in the `SlotManagerImpl`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.slots.ResourceCounter;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link SlotManager} supporting declarative slot 
management.
+ */
+public class DeclarativeSlotManager implements SlotManager {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeSlotManager.class);
+
+       private final SlotTracker slotTracker;
+       private final ResourceTracker resourceTracker;
+       private final BiFunction<Executor, ResourceActions, 
TaskExecutorManager> taskExecutorManagerFactory;
+       private TaskExecutorManager taskExecutorManager;
+
+       /** Timeout for slot requests to the task manager. */
+       private final Time taskManagerRequestTimeout;
+       private ScheduledFuture<?> slotRequestTimeoutCheck;
+
+       private final SlotMatchingStrategy slotMatchingStrategy;
+
+       private final SlotManagerMetricGroup slotManagerMetricGroup;
+
+       private final Map<JobID, String> jobMasterTargetAddresses = new 
HashMap<>();
+       private final HashMap<SlotID, CompletableFuture<Acknowledge>> 
pendingSlotAllocationFutures;
+
+       /** ResourceManager's id. */
+       private ResourceManagerId resourceManagerId;
+
+       /** Executor for future callbacks which have to be "synchronized". */
+       private Executor mainThreadExecutor;
+
+       /** Callbacks for resource (de-)allocations. */
+       private ResourceActions resourceActions;
+
+       /** True iff the component has been started. */
+       private boolean started;
+
+       public DeclarativeSlotManager(
+                       ScheduledExecutor scheduledExecutor,
+                       SlotManagerConfiguration slotManagerConfiguration,
+                       SlotManagerMetricGroup slotManagerMetricGroup,
+                       ResourceTracker resourceTracker,
+                       SlotTracker slotTracker) {
+
+               Preconditions.checkNotNull(slotManagerConfiguration);
+               this.taskManagerRequestTimeout = 
slotManagerConfiguration.getTaskManagerRequestTimeout();
+               this.slotManagerMetricGroup = 
Preconditions.checkNotNull(slotManagerMetricGroup);
+               this.resourceTracker = 
Preconditions.checkNotNull(resourceTracker);
+
+               pendingSlotAllocationFutures = new HashMap<>(16);
+
+               this.slotTracker = Preconditions.checkNotNull(slotTracker);
+               
slotTracker.registerSlotStatusUpdateListener(createSlotStatusUpdateListener());
+
+               slotMatchingStrategy = 
slotManagerConfiguration.getSlotMatchingStrategy();
+
+               taskExecutorManagerFactory = (executor, resourceActions) -> new 
TaskExecutorManager(
+                       slotManagerConfiguration.getDefaultWorkerResourceSpec(),
+                       slotManagerConfiguration.getNumSlotsPerWorker(),
+                       slotManagerConfiguration.getMaxSlotNum(),
+                       
slotManagerConfiguration.isWaitResultConsumedBeforeRelease(),
+                       slotManagerConfiguration.getRedundantTaskManagerNum(),
+                       slotManagerConfiguration.getTaskManagerTimeout(),
+                       scheduledExecutor,
+                       executor,
+                       resourceActions);
+
+               resourceManagerId = null;
+               resourceActions = null;
+               mainThreadExecutor = null;
+               slotRequestTimeoutCheck = null;
+               taskExecutorManager = null;
+
+               started = false;
+       }
+
+       private SlotStatusUpdateListener createSlotStatusUpdateListener() {
+               return (taskManagerSlot, previous, current, jobId) -> {
+                       if (previous == SlotState.PENDING) {
+                               
cancelAllocationFuture(taskManagerSlot.getSlotId());
+                       }
+
+                       if (current == SlotState.PENDING) {
+                               resourceTracker.notifyAcquiredResource(jobId, 
taskManagerSlot.getResourceProfile());
+                       }
+                       if (current == SlotState.FREE) {
+                               resourceTracker.notifyLostResource(jobId, 
taskManagerSlot.getResourceProfile());
+                       }
+
+                       if (current == SlotState.ALLOCATED) {
+                               
taskExecutorManager.occupySlot(taskManagerSlot.getInstanceId());
+                       }
+                       if (previous == SlotState.ALLOCATED && current == 
SlotState.FREE) {
+                               
taskExecutorManager.freeSlot(taskManagerSlot.getInstanceId());
+                       }
+               };
+       }
+
+       private void cancelAllocationFuture(SlotID slotId) {
+               final CompletableFuture<Acknowledge> 
acknowledgeCompletableFuture = pendingSlotAllocationFutures.remove(slotId);
+               // the future may be null if we are just re-playing the state 
transitions due to a slot report
+               if (acknowledgeCompletableFuture != null) {
+                       acknowledgeCompletableFuture.cancel(false);
+               }
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+       // Component lifecycle methods
+       // 
---------------------------------------------------------------------------------------------
+
+       /**
+        * Starts the slot manager with the given leader id and resource 
manager actions.
+        *
+        * @param newResourceManagerId to use for communication with the task 
managers
+        * @param newMainThreadExecutor to use to run code in the 
ResourceManager's main thread
+        * @param newResourceActions to use for resource (de-)allocations
+        */
+       @Override
+       public void start(ResourceManagerId newResourceManagerId, Executor 
newMainThreadExecutor, ResourceActions newResourceActions) {
+               LOG.info("Starting the slot manager.");
+
+               this.resourceManagerId = 
Preconditions.checkNotNull(newResourceManagerId);
+               mainThreadExecutor = 
Preconditions.checkNotNull(newMainThreadExecutor);
+               resourceActions = 
Preconditions.checkNotNull(newResourceActions);
+               taskExecutorManager = 
taskExecutorManagerFactory.apply(newMainThreadExecutor, newResourceActions);
+
+               started = true;
+
+               registerSlotManagerMetrics();
+       }
+
+       private void registerSlotManagerMetrics() {
+               slotManagerMetricGroup.gauge(
+                       MetricNames.TASK_SLOTS_AVAILABLE,
+                       () -> (long) getNumberFreeSlots());
+               slotManagerMetricGroup.gauge(
+                       MetricNames.TASK_SLOTS_TOTAL,
+                       () -> (long) getNumberRegisteredSlots());
+       }

Review comment:
       I guess the same problem already exists in the `SlotManagerImpl`. Hence, 
we can tackle it as a follow up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to