This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1d78ed749bdd5f2e7325b2e375f00ef262dfb4c
Author: Roc Marshal <[email protected]>
AuthorDate: Wed Dec 13 12:09:32 2023 +0800

    [hotfix][runtime] Migrate the Time to Duration for DefaultScheduler in the 
minimum impact range
---
 .../DefaultSlotPoolServiceSchedulerFactory.java    | 10 ++++----
 .../slotpool/AbstractSlotPoolServiceFactory.java   | 15 ++++++------
 .../slotpool/BlocklistDeclarativeSlotPool.java     |  6 ++---
 .../BlocklistDeclarativeSlotPoolFactory.java       |  6 ++---
 .../slotpool/DeclarativeSlotPoolBridge.java        | 27 +++++++++-------------
 .../DeclarativeSlotPoolBridgeServiceFactory.java   |  9 ++++----
 .../slotpool/DeclarativeSlotPoolFactory.java       |  6 ++---
 .../slotpool/DeclarativeSlotPoolService.java       |  7 +++---
 .../DeclarativeSlotPoolServiceFactory.java         | 10 ++++----
 .../slotpool/DefaultDeclarativeSlotPool.java       | 16 ++++++++-----
 .../DefaultDeclarativeSlotPoolFactory.java         |  6 ++---
 .../runtime/resourcemanager/ResourceManager.java   |  2 +-
 .../resourcemanager/ResourceManagerGateway.java    |  2 +-
 .../slotpool/BlocklistDeclarativeSlotPoolTest.java |  6 ++---
 .../slotpool/DeclarativeSlotPoolBridgeBuilder.java | 13 +++++------
 ...tiveSlotPoolBridgePreferredAllocationsTest.java |  6 ++---
 .../slotpool/DeclarativeSlotPoolBridgeTest.java    | 12 ++++++----
 .../slotpool/DeclarativeSlotPoolServiceTest.java   |  6 ++---
 .../DefaultDeclarativeSlotPoolBuilder.java         |  8 +++----
 .../slotpool/DefaultDeclarativeSlotPoolTest.java   | 10 ++++----
 .../DefaultDeclarativeSlotPoolTestBase.java        |  0
 .../slotpool/SlotPoolBatchSlotRequestTest.java     |  4 ++--
 .../TestingDeclarativeSlotPoolFactory.java         |  6 ++---
 .../resourcemanager/ResourceManagerTest.java       |  4 ++--
 .../utils/TestingResourceManagerGateway.java       |  2 +-
 .../DefaultSchedulerBatchSchedulingTest.java       |  2 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    |  5 ++--
 .../adaptive/AdaptiveSchedulerBuilder.java         |  9 ++++----
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  9 ++++----
 29 files changed, 114 insertions(+), 110 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index f33846284e7..6474933f962 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -57,6 +57,7 @@ import org.apache.flink.util.clock.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -151,12 +152,9 @@ public final class DefaultSlotPoolServiceSchedulerFactory
     public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration(
             Configuration configuration, JobType jobType, boolean 
isDynamicGraph) {
 
-        final Time rpcTimeout =
-                
Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
-        final Time slotIdleTimeout =
-                
Time.fromDuration(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT));
-        final Time batchSlotTimeout =
-                
Time.fromDuration(configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
+        final Duration rpcTimeout = 
configuration.get(RpcOptions.ASK_TIMEOUT_DURATION);
+        final Duration slotIdleTimeout = 
configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT);
+        final Duration batchSlotTimeout = 
configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT);
 
         final SlotPoolServiceFactory slotPoolServiceFactory;
         final SchedulerNGFactory schedulerNGFactory;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
index 3f4f96242cb..ba281d1cd39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java
@@ -18,27 +18,28 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.clock.Clock;
 
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
+
 /** Abstract SlotPoolServiceFactory. */
 public abstract class AbstractSlotPoolServiceFactory implements 
SlotPoolServiceFactory {
 
     @Nonnull protected final Clock clock;
 
-    @Nonnull protected final Time rpcTimeout;
+    @Nonnull protected final Duration rpcTimeout;
 
-    @Nonnull protected final Time slotIdleTimeout;
+    @Nonnull protected final Duration slotIdleTimeout;
 
-    @Nonnull protected final Time batchSlotTimeout;
+    @Nonnull protected final Duration batchSlotTimeout;
 
     protected AbstractSlotPoolServiceFactory(
             @Nonnull Clock clock,
-            @Nonnull Time rpcTimeout,
-            @Nonnull Time slotIdleTimeout,
-            @Nonnull Time batchSlotTimeout) {
+            @Nonnull Duration rpcTimeout,
+            @Nonnull Duration slotIdleTimeout,
+            @Nonnull Duration batchSlotTimeout) {
         this.clock = clock;
         this.rpcTimeout = rpcTimeout;
         this.slotIdleTimeout = slotIdleTimeout;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
index afbdf2c2298..c8f9ada9fdc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -33,6 +32,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Optional;
@@ -55,8 +55,8 @@ public class BlocklistDeclarativeSlotPool extends 
DefaultDeclarativeSlotPool {
             AllocatedSlotPool slotPool,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
             BlockedTaskManagerChecker blockedTaskManagerChecker,
-            Time idleSlotTimeout,
-            Time rpcTimeout) {
+            Duration idleSlotTimeout,
+            Duration rpcTimeout) {
         super(jobId, slotPool, notifyNewResourceRequirements, idleSlotTimeout, 
rpcTimeout);
         this.blockedTaskManagerChecker = 
checkNotNull(blockedTaskManagerChecker);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
index b3a4db27d83..7b85f9e4ebd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.function.Consumer;
 
@@ -42,8 +42,8 @@ public class BlocklistDeclarativeSlotPoolFactory implements 
DeclarativeSlotPoolF
     public DeclarativeSlotPool create(
             JobID jobId,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
-            Time idleSlotTimeout,
-            Time rpcTimeout) {
+            Duration idleSlotTimeout,
+            Duration rpcTimeout) {
         return new BlocklistDeclarativeSlotPool(
                 jobId,
                 new DefaultAllocatedSlotPool(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 1a77528fa2d..444c2aa0759 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -40,6 +40,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -61,13 +62,13 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
 
     private final Map<SlotRequestId, PendingRequest> pendingRequests;
     private final Map<SlotRequestId, AllocationID> fulfilledRequests;
-    private final Time idleSlotTimeout;
+    private final Duration idleSlotTimeout;
 
     private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
 
     @Nullable private ComponentMainThreadExecutor componentMainThreadExecutor;
 
-    private final Time batchSlotTimeout;
+    private final Duration batchSlotTimeout;
     private boolean isBatchSlotRequestTimeoutCheckDisabled;
 
     private boolean isJobRestarting = false;
@@ -76,9 +77,9 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
             JobID jobId,
             DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
             Clock clock,
-            Time rpcTimeout,
-            Time idleSlotTimeout,
-            Time batchSlotTimeout,
+            Duration rpcTimeout,
+            Duration idleSlotTimeout,
+            Duration batchSlotTimeout,
             RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
         super(jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, 
rpcTimeout);
 
@@ -112,13 +113,9 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
         
getDeclarativeSlotPool().registerNewSlotsListener(this::newSlotsAreAvailable);
 
         componentMainThreadExecutor.schedule(
-                this::checkIdleSlotTimeout,
-                idleSlotTimeout.toMilliseconds(),
-                TimeUnit.MILLISECONDS);
+                this::checkIdleSlotTimeout, idleSlotTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
         componentMainThreadExecutor.schedule(
-                this::checkBatchSlotTimeout,
-                batchSlotTimeout.toMilliseconds(),
-                TimeUnit.MILLISECONDS);
+                this::checkBatchSlotTimeout, batchSlotTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -454,9 +451,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
 
         if (componentMainThreadExecutor != null) {
             componentMainThreadExecutor.schedule(
-                    this::checkIdleSlotTimeout,
-                    idleSlotTimeout.toMilliseconds(),
-                    TimeUnit.MILLISECONDS);
+                    this::checkIdleSlotTimeout, idleSlotTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
         }
     }
 
@@ -492,7 +487,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
             for (PendingRequest unfulfillableRequest : unfulfillableRequests) {
                 unfulfillableRequest.markUnfulfillable(currentTimestamp);
 
-                if (unfulfillableRequest.getUnfulfillableSince() + 
batchSlotTimeout.toMilliseconds()
+                if (unfulfillableRequest.getUnfulfillableSince() + 
batchSlotTimeout.toMillis()
                         <= currentTimestamp) {
                     
timeoutPendingSlotRequest(unfulfillableRequest.getSlotRequestId());
                 }
@@ -502,7 +497,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
         if (componentMainThreadExecutor != null) {
             componentMainThreadExecutor.schedule(
                     this::checkBatchSlotTimeout,
-                    batchSlotTimeout.toMilliseconds(),
+                    batchSlotTimeout.toMillis(),
                     TimeUnit.MILLISECONDS);
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
index 339f494272e..279a2d58df3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java
@@ -19,11 +19,12 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.clock.Clock;
 
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
+
 /** Factory for {@link DeclarativeSlotPoolBridge}. */
 public class DeclarativeSlotPoolBridgeServiceFactory extends 
AbstractSlotPoolServiceFactory {
 
@@ -31,9 +32,9 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends 
AbstractSlotPoolSer
 
     public DeclarativeSlotPoolBridgeServiceFactory(
             @Nonnull Clock clock,
-            @Nonnull Time rpcTimeout,
-            @Nonnull Time slotIdleTimeout,
-            @Nonnull Time batchSlotTimeout,
+            @Nonnull Duration rpcTimeout,
+            @Nonnull Duration slotIdleTimeout,
+            @Nonnull Duration batchSlotTimeout,
             @Nonnull RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
         super(clock, rpcTimeout, slotIdleTimeout, batchSlotTimeout);
         this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java
index 77cb9964221..6630254552c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.function.Consumer;
 
@@ -30,6 +30,6 @@ public interface DeclarativeSlotPoolFactory {
     DeclarativeSlotPool create(
             JobID jobId,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
-            Time idleSlotTimeout,
-            Time rpcTimeout);
+            Duration idleSlotTimeout,
+            Duration rpcTimeout);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
index 6088d8b45f7..9f00b8ff0bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -56,7 +57,7 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
 
     private final JobID jobId;
 
-    private final Time rpcTimeout;
+    private final Duration rpcTimeout;
 
     private final DeclarativeSlotPool declarativeSlotPool;
 
@@ -80,8 +81,8 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
             JobID jobId,
             DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
             Clock clock,
-            Time idleSlotTimeout,
-            Time rpcTimeout) {
+            Duration idleSlotTimeout,
+            Duration rpcTimeout) {
         this.jobId = jobId;
         this.clock = clock;
         this.rpcTimeout = rpcTimeout;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
index af8e3d4d801..0680a9587c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java
@@ -19,19 +19,21 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.clock.Clock;
 
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
+
 /** Factory for the {@link DeclarativeSlotPoolService}. */
 public class DeclarativeSlotPoolServiceFactory implements 
SlotPoolServiceFactory {
 
     private final Clock clock;
-    private final Time idleSlotTimeout;
-    private final Time rpcTimeout;
+    private final Duration idleSlotTimeout;
+    private final Duration rpcTimeout;
 
-    public DeclarativeSlotPoolServiceFactory(Clock clock, Time 
idleSlotTimeout, Time rpcTimeout) {
+    public DeclarativeSlotPoolServiceFactory(
+            Clock clock, Duration idleSlotTimeout, Duration rpcTimeout) {
         this.clock = clock;
         this.idleSlotTimeout = idleSlotTimeout;
         this.rpcTimeout = rpcTimeout;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
index ebec6f34264..70061b67fa9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -87,8 +88,8 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
 
     private final Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements;
 
-    private final Time idleSlotTimeout;
-    private final Time rpcTimeout;
+    private final Duration idleSlotTimeout;
+    private final Duration rpcTimeout;
 
     private final JobID jobId;
     protected final AllocatedSlotPool slotPool;
@@ -107,8 +108,8 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
             JobID jobId,
             AllocatedSlotPool slotPool,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
-            Time idleSlotTimeout,
-            Time rpcTimeout) {
+            Duration idleSlotTimeout,
+            Duration rpcTimeout) {
 
         this.jobId = jobId;
         this.slotPool = slotPool;
@@ -499,7 +500,7 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
         while (!excessResources.isEmpty() && freeSlotIterator.hasNext()) {
             final AllocatedSlotPool.FreeSlotInfo idleSlot = 
freeSlotIterator.next();
 
-            if (currentTimeMillis >= idleSlot.getFreeSince() + 
idleSlotTimeout.toMilliseconds()) {
+            if (currentTimeMillis >= idleSlot.getFreeSince() + 
idleSlotTimeout.toMillis()) {
                 final ResourceProfile matchingProfile =
                         getMatchingResourceProfile(idleSlot.getAllocationId());
 
@@ -546,7 +547,10 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
             final CompletableFuture<Acknowledge> freeSlotFuture =
                     slotToReturn
                             .getTaskManagerGateway()
-                            .freeSlot(slotToReturn.getAllocationId(), cause, 
rpcTimeout);
+                            .freeSlot(
+                                    slotToReturn.getAllocationId(),
+                                    cause,
+                                    Time.fromDuration(rpcTimeout));
 
             freeSlotFuture.whenComplete(
                     (Acknowledge ignored, Throwable throwable) -> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java
index 581aba4a346..95655b1fe3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.function.Consumer;
 
@@ -32,8 +32,8 @@ public class DefaultDeclarativeSlotPoolFactory implements 
DeclarativeSlotPoolFac
     public DeclarativeSlotPool create(
             JobID jobId,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
-            Time idleSlotTimeout,
-            Time rpcTimeout) {
+            Duration idleSlotTimeout,
+            Duration rpcTimeout) {
         return new DefaultDeclarativeSlotPool(
                 jobId,
                 new DefaultAllocatedSlotPool(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 177b045449e..c5ab1545fb7 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -572,7 +572,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
 
     @Override
     public CompletableFuture<Acknowledge> declareRequiredResources(
-            JobMasterId jobMasterId, ResourceRequirements 
resourceRequirements, Time timeout) {
+            JobMasterId jobMasterId, ResourceRequirements 
resourceRequirements, Duration timeout) {
         final JobID jobId = resourceRequirements.getJobId();
         try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
             final JobManagerRegistration jobManagerRegistration =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 6adde0f9e23..275bc6299e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -88,7 +88,7 @@ public interface ResourceManagerGateway
     CompletableFuture<Acknowledge> declareRequiredResources(
             JobMasterId jobMasterId,
             ResourceRequirements resourceRequirements,
-            @RpcTimeout Time timeout);
+            @RpcTimeout Duration timeout);
 
     /**
      * Register a {@link TaskExecutor} at the resource manager.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
index af5354ddbb7..f0f7e6b719f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java
@@ -18,7 +18,6 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.util.ResourceCounter;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -305,8 +305,8 @@ class BlocklistDeclarativeSlotPoolTest {
                     new DefaultAllocatedSlotPool(),
                     ignored -> {},
                     blockedTaskManagerChecker,
-                    Time.seconds(20),
-                    Time.seconds(20));
+                    Duration.ofSeconds(20),
+                    Duration.ofSeconds(20));
         }
 
         public static BlocklistDeclarativeSlotPoolBuilder builder() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
index 31264fe448e..f3a31c4d552 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
@@ -31,15 +30,15 @@ import org.apache.flink.util.clock.SystemClock;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
 /** Builder for a {@link DeclarativeSlotPoolBridge}. */
 public class DeclarativeSlotPoolBridgeBuilder {
 
     private JobID jobId = new JobID();
-    private Time batchSlotTimeout =
-            
Time.fromDuration(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue());
-    private Time idleSlotTimeout = TestingUtils.infiniteTime();
+    private Duration batchSlotTimeout = 
JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue();
+    private Duration idleSlotTimeout = 
TestingUtils.infiniteTime().toDuration();
     private Clock clock = SystemClock.getInstance();
 
     @Nullable
@@ -54,12 +53,12 @@ public class DeclarativeSlotPoolBridgeBuilder {
         return this;
     }
 
-    public DeclarativeSlotPoolBridgeBuilder setBatchSlotTimeout(Time 
batchSlotTimeout) {
+    public DeclarativeSlotPoolBridgeBuilder setBatchSlotTimeout(Duration 
batchSlotTimeout) {
         this.batchSlotTimeout = batchSlotTimeout;
         return this;
     }
 
-    public DeclarativeSlotPoolBridgeBuilder setIdleSlotTimeout(Time 
idleSlotTimeout) {
+    public DeclarativeSlotPoolBridgeBuilder setIdleSlotTimeout(Duration 
idleSlotTimeout) {
         this.idleSlotTimeout = idleSlotTimeout;
         return this;
     }
@@ -85,7 +84,7 @@ public class DeclarativeSlotPoolBridgeBuilder {
                 jobId,
                 new DefaultDeclarativeSlotPoolFactory(),
                 clock,
-                TestingUtils.infiniteTime(),
+                TestingUtils.infiniteTime().toDuration(),
                 idleSlotTimeout,
                 batchSlotTimeout,
                 requestSlotMatchingStrategy);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
index 1d33f299c5c..90ee4b2fb8f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
@@ -51,9 +51,9 @@ class DeclarativeSlotPoolBridgePreferredAllocationsTest {
                         new JobID(),
                         new DefaultDeclarativeSlotPoolFactory(),
                         SystemClock.getInstance(),
-                        TestingUtils.infiniteTime(),
-                        TestingUtils.infiniteTime(),
-                        TestingUtils.infiniteTime(),
+                        TestingUtils.infiniteTime().toDuration(),
+                        TestingUtils.infiniteTime().toDuration(),
+                        TestingUtils.infiniteTime().toDuration(),
                         
PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
 
         declarativeSlotPoolBridge.start(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
index c28932edfc8..2d97dc3b58b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
@@ -62,7 +62,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 @ExtendWith(ParameterizedTestExtension.class)
 class DeclarativeSlotPoolBridgeTest {
 
-    private static final Time rpcTimeout = Time.seconds(20);
+    private static final Duration rpcTimeout = Duration.ofSeconds(20);
     private static final JobID jobId = new JobID();
     private static final JobMasterId jobMasterId = JobMasterId.generate();
     private final ComponentMainThreadExecutor mainThreadExecutor =
@@ -186,7 +186,7 @@ class DeclarativeSlotPoolBridgeTest {
                                                 
declarativeSlotPoolBridge.requestNewAllocatedSlot(
                                                         slotRequestId,
                                                         
ResourceProfile.UNKNOWN,
-                                                        rpcTimeout);
+                                                        
Time.fromDuration(rpcTimeout));
                                         slotFuture.whenComplete(
                                                 (physicalSlot, throwable) -> {
                                                     if (throwable != null) {
@@ -216,7 +216,9 @@ class DeclarativeSlotPoolBridgeTest {
 
             final CompletableFuture<PhysicalSlot> slotFuture =
                     declarativeSlotPoolBridge.requestNewAllocatedSlot(
-                            new SlotRequestId(), ResourceProfile.UNKNOWN, 
rpcTimeout);
+                            new SlotRequestId(),
+                            ResourceProfile.UNKNOWN,
+                            Time.fromDuration(rpcTimeout));
 
             final LocalTaskManagerLocation localTaskManagerLocation =
                     new LocalTaskManagerLocation();
@@ -278,8 +280,8 @@ class DeclarativeSlotPoolBridgeTest {
                 declarativeSlotPoolFactory,
                 SystemClock.getInstance(),
                 rpcTimeout,
-                Time.seconds(20),
-                Time.seconds(20),
+                Duration.ofSeconds(20),
+                Duration.ofSeconds(20),
                 requestSlotMatchingStrategy);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
index 91b16a24ddb..91d11eb5d04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -47,6 +46,7 @@ import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -359,8 +359,8 @@ class DeclarativeSlotPoolServiceTest {
                         jobId,
                         declarativeSlotPoolFactory,
                         SystemClock.getInstance(),
-                        Time.seconds(20L),
-                        Time.seconds(20L));
+                        Duration.ofSeconds(20L),
+                        Duration.ofSeconds(20L));
 
         declarativeSlotPoolService.start(jobMasterId, address, 
mainThreadExecutor);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java
index f1a4d0933f3..ea177ff6f85 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java
@@ -18,9 +18,9 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.function.Consumer;
 
@@ -30,8 +30,8 @@ final class DefaultDeclarativeSlotPoolBuilder {
     private AllocatedSlotPool allocatedSlotPool = new 
DefaultAllocatedSlotPool();
     private Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements =
             ignored -> {};
-    private Time idleSlotTimeout = Time.seconds(20);
-    private Time rpcTimeout = Time.seconds(20);
+    private Duration idleSlotTimeout = Duration.ofSeconds(20);
+    private Duration rpcTimeout = Duration.ofSeconds(20);
 
     public DefaultDeclarativeSlotPoolBuilder setAllocatedSlotPool(
             AllocatedSlotPool allocatedSlotPool) {
@@ -45,7 +45,7 @@ final class DefaultDeclarativeSlotPoolBuilder {
         return this;
     }
 
-    public DefaultDeclarativeSlotPoolBuilder setIdleSlotTimeout(Time 
idleSlotTimeout) {
+    public DefaultDeclarativeSlotPoolBuilder setIdleSlotTimeout(Duration 
idleSlotTimeout) {
         this.idleSlotTimeout = idleSlotTimeout;
         return this;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
index 895423304c7..0244cbadf18 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
@@ -41,6 +40,7 @@ import org.junit.jupiter.api.Test;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -419,7 +419,7 @@ class DefaultDeclarativeSlotPoolTest {
 
     @Test
     void testReturnIdleSlotsAfterTimeout() {
-        final Time idleSlotTimeout = Time.seconds(10);
+        final Duration idleSlotTimeout = Duration.ofSeconds(10);
         final long offerTime = 0;
         final DefaultDeclarativeSlotPool slotPool =
                 DefaultDeclarativeSlotPoolBuilder.builder()
@@ -443,7 +443,7 @@ class DefaultDeclarativeSlotPoolTest {
         // decrease the resource requirements so that slots are no longer 
needed
         slotPool.decreaseResourceRequirementsBy(resourceRequirements);
 
-        slotPool.releaseIdleSlots(offerTime + 
idleSlotTimeout.toMilliseconds());
+        slotPool.releaseIdleSlots(offerTime + idleSlotTimeout.toMillis());
 
         final Collection<AllocationID> freedSlots = 
freeSlotConsumer.drainFreedSlots();
 
@@ -463,7 +463,7 @@ class DefaultDeclarativeSlotPoolTest {
 
     @Test
     void testOnlyReturnExcessIdleSlots() {
-        final Time idleSlotTimeout = Time.seconds(10);
+        final Duration idleSlotTimeout = Duration.ofSeconds(10);
         final long offerTime = 0;
         final DefaultDeclarativeSlotPool slotPool =
                 DefaultDeclarativeSlotPoolBuilder.builder()
@@ -483,7 +483,7 @@ class DefaultDeclarativeSlotPoolTest {
         final ResourceCounter excessRequirements = 
resourceRequirements.subtract(requiredResources);
         slotPool.decreaseResourceRequirementsBy(excessRequirements);
 
-        slotPool.releaseIdleSlots(offerTime + 
idleSlotTimeout.toMilliseconds());
+        slotPool.releaseIdleSlots(offerTime + idleSlotTimeout.toMillis());
 
         assertThat(acceptedSlots).isNotEmpty();
         
assertThat(slotPool.getFulfilledResourceRequirements()).isEqualTo(requiredResources);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java
new file mode 100644
index 00000000000..e69de29bb2d
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
index 138b3809221..208e71aa5fb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
@@ -237,7 +237,7 @@ class SlotPoolBatchSlotRequestTest {
 
         return new DeclarativeSlotPoolBridgeBuilder()
                 .setResourceManagerGateway(resourceManagerGateway)
-                .setBatchSlotTimeout(batchSlotTimeout)
+                .setBatchSlotTimeout(batchSlotTimeout.toDuration())
                 .buildAndStart(componentMainThreadExecutor);
     }
 
@@ -250,7 +250,7 @@ class SlotPoolBatchSlotRequestTest {
 
         return new DeclarativeSlotPoolBridgeBuilder()
                 .setResourceManagerGateway(resourceManagerGateway)
-                .setBatchSlotTimeout(batchSlotTimeout)
+                .setBatchSlotTimeout(batchSlotTimeout.toDuration())
                 .setClock(clock)
                 .buildAndStart(componentMainThreadExecutor);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java
index 5e989850168..3ca17396654 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.function.Consumer;
 
@@ -38,8 +38,8 @@ final class TestingDeclarativeSlotPoolFactory implements 
DeclarativeSlotPoolFact
     public TestingDeclarativeSlotPool create(
             JobID jobId,
             Consumer<? super Collection<ResourceRequirement>> 
notifyNewResourceRequirements,
-            Time idleSlotTimeout,
-            Time rpcTimeout) {
+            Duration idleSlotTimeout,
+            Duration rpcTimeout) {
         return builder.build();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index a006a147358..d3bf93dfd02 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -313,7 +313,7 @@ class ResourceManagerTest {
                                 jobMasterGateway.getAddress(),
                                 Collections.singleton(
                                         
ResourceRequirement.create(ResourceProfile.UNKNOWN, 1))),
-                        TIMEOUT)
+                        TIMEOUT.toDuration())
                 .get();
 
         resourceManagerGateway.disconnectJobManager(
@@ -372,7 +372,7 @@ class ResourceManagerTest {
                         jobMasterGateway.getAddress(),
                         Collections.singleton(
                                 
ResourceRequirement.create(ResourceProfile.UNKNOWN, 1))),
-                TIMEOUT);
+                TIMEOUT.toDuration());
         resourceManager
                 .runInMainThread(
                         () -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 3c7e3991e83..8f7a697879a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -297,7 +297,7 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
 
     @Override
     public CompletableFuture<Acknowledge> declareRequiredResources(
-            JobMasterId jobMasterId, ResourceRequirements 
resourceRequirements, Time timeout) {
+            JobMasterId jobMasterId, ResourceRequirements 
resourceRequirements, Duration timeout) {
         return declareRequiredResourcesFunction.apply(jobMasterId, 
resourceRequirements);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
index 7e8331d8ebb..f7a22ac3b65 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
@@ -185,7 +185,7 @@ class DefaultSchedulerBatchSchedulingTest {
             ComponentMainThreadExecutor mainThreadExecutor, Time 
batchSlotTimeout)
             throws Exception {
         return new DeclarativeSlotPoolBridgeBuilder()
-                .setBatchSlotTimeout(batchSlotTimeout)
+                .setBatchSlotTimeout(batchSlotTimeout.toDuration())
                 .buildAndStart(mainThreadExecutor);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index d68087da908..531d757948b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -115,6 +115,7 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1595,7 +1596,7 @@ public class DefaultSchedulerTest {
                 
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
                         scheduledExecutorService);
 
-        final Time slotTimeout = Time.milliseconds(5L);
+        final Duration slotTimeout = Duration.ofMillis(5L);
         final SlotPool slotPool =
                 new DeclarativeSlotPoolBridgeBuilder()
                         .setBatchSlotTimeout(slotTimeout)
@@ -1613,7 +1614,7 @@ public class DefaultSchedulerTest {
                                         .addJob(new JobID(), "jobName"))
                         .setExecutionSlotAllocatorFactory(
                                 
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
-                                        slotProvider, slotTimeout))
+                                        slotProvider, 
Time.fromDuration(slotTimeout)))
                         .build();
 
         final AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway 
taskManagerGateway =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 42f50616490..b3207436a47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -52,6 +52,7 @@ import org.apache.flink.util.FatalExitExceptionHandler;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ScheduledExecutorService;
@@ -60,7 +61,7 @@ import java.util.function.Function;
 
 /** Builder for {@link AdaptiveScheduler}. */
 public class AdaptiveSchedulerBuilder {
-    private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
+    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
 
     private final JobGraph jobGraph;
 
@@ -75,7 +76,7 @@ public class AdaptiveSchedulerBuilder {
     private CheckpointRecoveryFactory checkpointRecoveryFactory =
             new StandaloneCheckpointRecoveryFactory();
     private DeclarativeSlotPool declarativeSlotPool;
-    private Time rpcTimeout = DEFAULT_TIMEOUT;
+    private Duration rpcTimeout = DEFAULT_TIMEOUT;
     private BlobWriter blobWriter = VoidBlobWriter.getInstance();
     private JobManagerJobMetricGroup jobManagerJobMetricGroup =
             
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
@@ -157,7 +158,7 @@ public class AdaptiveSchedulerBuilder {
     }
 
     public AdaptiveSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
-        this.rpcTimeout = rpcTimeout;
+        this.rpcTimeout = rpcTimeout.toDuration();
         return this;
     }
 
@@ -246,7 +247,7 @@ public class AdaptiveSchedulerBuilder {
                         new DefaultExecutionDeploymentTracker(),
                         executorService,
                         executorService,
-                        rpcTimeout,
+                        Time.fromDuration(rpcTimeout),
                         jobManagerJobMetricGroup,
                         blobWriter,
                         shuffleMaster,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index d1bde31da6c..f6e7246687b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MetricOptions;
@@ -536,8 +535,8 @@ public class AdaptiveSchedulerTest {
                         jobGraph.getJobID(),
                         new DefaultAllocatedSlotPool(),
                         ignored -> {},
-                        Time.minutes(10),
-                        Time.minutes(10));
+                        Duration.ofMinutes(10),
+                        Duration.ofMinutes(10));
 
         final Configuration configuration = 
createConfigurationWithNoTimeouts();
         configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
@@ -2312,8 +2311,8 @@ public class AdaptiveSchedulerTest {
                 jobId,
                 new DefaultAllocatedSlotPool(),
                 ignored -> {},
-                Time.fromDuration(idleSlotTimeout),
-                Time.fromDuration(DEFAULT_TIMEOUT));
+                idleSlotTimeout,
+                DEFAULT_TIMEOUT);
     }
 
     private static JobGraph createJobGraph() {

Reply via email to