This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b1d78ed749bdd5f2e7325b2e375f00ef262dfb4c Author: Roc Marshal <[email protected]> AuthorDate: Wed Dec 13 12:09:32 2023 +0800 [hotfix][runtime] Migrate the Time to Duration for DefaultScheduler in the minimum impact range --- .../DefaultSlotPoolServiceSchedulerFactory.java | 10 ++++---- .../slotpool/AbstractSlotPoolServiceFactory.java | 15 ++++++------ .../slotpool/BlocklistDeclarativeSlotPool.java | 6 ++--- .../BlocklistDeclarativeSlotPoolFactory.java | 6 ++--- .../slotpool/DeclarativeSlotPoolBridge.java | 27 +++++++++------------- .../DeclarativeSlotPoolBridgeServiceFactory.java | 9 ++++---- .../slotpool/DeclarativeSlotPoolFactory.java | 6 ++--- .../slotpool/DeclarativeSlotPoolService.java | 7 +++--- .../DeclarativeSlotPoolServiceFactory.java | 10 ++++---- .../slotpool/DefaultDeclarativeSlotPool.java | 16 ++++++++----- .../DefaultDeclarativeSlotPoolFactory.java | 6 ++--- .../runtime/resourcemanager/ResourceManager.java | 2 +- .../resourcemanager/ResourceManagerGateway.java | 2 +- .../slotpool/BlocklistDeclarativeSlotPoolTest.java | 6 ++--- .../slotpool/DeclarativeSlotPoolBridgeBuilder.java | 13 +++++------ ...tiveSlotPoolBridgePreferredAllocationsTest.java | 6 ++--- .../slotpool/DeclarativeSlotPoolBridgeTest.java | 12 ++++++---- .../slotpool/DeclarativeSlotPoolServiceTest.java | 6 ++--- .../DefaultDeclarativeSlotPoolBuilder.java | 8 +++---- .../slotpool/DefaultDeclarativeSlotPoolTest.java | 10 ++++---- .../DefaultDeclarativeSlotPoolTestBase.java | 0 .../slotpool/SlotPoolBatchSlotRequestTest.java | 4 ++-- .../TestingDeclarativeSlotPoolFactory.java | 6 ++--- .../resourcemanager/ResourceManagerTest.java | 4 ++-- .../utils/TestingResourceManagerGateway.java | 2 +- .../DefaultSchedulerBatchSchedulingTest.java | 2 +- .../runtime/scheduler/DefaultSchedulerTest.java | 5 ++-- .../adaptive/AdaptiveSchedulerBuilder.java | 9 ++++---- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 9 ++++---- 29 files changed, 114 insertions(+), 110 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index f33846284e7..6474933f962 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -57,6 +57,7 @@ import org.apache.flink.util.clock.SystemClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Collection; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -151,12 +152,9 @@ public final class DefaultSlotPoolServiceSchedulerFactory public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration( Configuration configuration, JobType jobType, boolean isDynamicGraph) { - final Time rpcTimeout = - Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)); - final Time slotIdleTimeout = - Time.fromDuration(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT)); - final Time batchSlotTimeout = - Time.fromDuration(configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT)); + final Duration rpcTimeout = configuration.get(RpcOptions.ASK_TIMEOUT_DURATION); + final Duration slotIdleTimeout = configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT); + final Duration batchSlotTimeout = configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT); final SlotPoolServiceFactory slotPoolServiceFactory; final SchedulerNGFactory schedulerNGFactory; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java index 3f4f96242cb..ba281d1cd39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java @@ -18,27 +18,28 @@ package org.apache.flink.runtime.jobmaster.slotpool; -import org.apache.flink.api.common.time.Time; import org.apache.flink.util.clock.Clock; import javax.annotation.Nonnull; +import java.time.Duration; + /** Abstract SlotPoolServiceFactory. */ public abstract class AbstractSlotPoolServiceFactory implements SlotPoolServiceFactory { @Nonnull protected final Clock clock; - @Nonnull protected final Time rpcTimeout; + @Nonnull protected final Duration rpcTimeout; - @Nonnull protected final Time slotIdleTimeout; + @Nonnull protected final Duration slotIdleTimeout; - @Nonnull protected final Time batchSlotTimeout; + @Nonnull protected final Duration batchSlotTimeout; protected AbstractSlotPoolServiceFactory( @Nonnull Clock clock, - @Nonnull Time rpcTimeout, - @Nonnull Time slotIdleTimeout, - @Nonnull Time batchSlotTimeout) { + @Nonnull Duration rpcTimeout, + @Nonnull Duration slotIdleTimeout, + @Nonnull Duration batchSlotTimeout) { this.clock = clock; this.rpcTimeout = rpcTimeout; this.slotIdleTimeout = slotIdleTimeout; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java index afbdf2c2298..c8f9ada9fdc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPool.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -33,6 +32,7 @@ import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Optional; @@ -55,8 +55,8 @@ public class BlocklistDeclarativeSlotPool extends DefaultDeclarativeSlotPool { AllocatedSlotPool slotPool, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, BlockedTaskManagerChecker blockedTaskManagerChecker, - Time idleSlotTimeout, - Time rpcTimeout) { + Duration idleSlotTimeout, + Duration rpcTimeout) { super(jobId, slotPool, notifyNewResourceRequirements, idleSlotTimeout, rpcTimeout); this.blockedTaskManagerChecker = checkNotNull(blockedTaskManagerChecker); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java index b3a4db27d83..7b85f9e4ebd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolFactory.java @@ -19,10 +19,10 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; import org.apache.flink.runtime.slots.ResourceRequirement; +import java.time.Duration; import java.util.Collection; import java.util.function.Consumer; @@ -42,8 +42,8 @@ public class BlocklistDeclarativeSlotPoolFactory implements DeclarativeSlotPoolF public DeclarativeSlotPool create( JobID jobId, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, - Time idleSlotTimeout, - Time rpcTimeout) { + Duration idleSlotTimeout, + Duration rpcTimeout) { return new BlocklistDeclarativeSlotPool( jobId, new DefaultAllocatedSlotPool(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index 1a77528fa2d..444c2aa0759 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -40,6 +40,7 @@ import org.apache.flink.util.concurrent.FutureUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -61,13 +62,13 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem private final Map<SlotRequestId, PendingRequest> pendingRequests; private final Map<SlotRequestId, AllocationID> fulfilledRequests; - private final Time idleSlotTimeout; + private final Duration idleSlotTimeout; private final RequestSlotMatchingStrategy requestSlotMatchingStrategy; @Nullable private ComponentMainThreadExecutor componentMainThreadExecutor; - private final Time batchSlotTimeout; + private final Duration batchSlotTimeout; private boolean isBatchSlotRequestTimeoutCheckDisabled; private boolean isJobRestarting = false; @@ -76,9 +77,9 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, - Time rpcTimeout, - Time idleSlotTimeout, - Time batchSlotTimeout, + Duration rpcTimeout, + Duration idleSlotTimeout, + Duration batchSlotTimeout, RequestSlotMatchingStrategy requestSlotMatchingStrategy) { super(jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, rpcTimeout); @@ -112,13 +113,9 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem getDeclarativeSlotPool().registerNewSlotsListener(this::newSlotsAreAvailable); componentMainThreadExecutor.schedule( - this::checkIdleSlotTimeout, - idleSlotTimeout.toMilliseconds(), - TimeUnit.MILLISECONDS); + this::checkIdleSlotTimeout, idleSlotTimeout.toMillis(), TimeUnit.MILLISECONDS); componentMainThreadExecutor.schedule( - this::checkBatchSlotTimeout, - batchSlotTimeout.toMilliseconds(), - TimeUnit.MILLISECONDS); + this::checkBatchSlotTimeout, batchSlotTimeout.toMillis(), TimeUnit.MILLISECONDS); } @Override @@ -454,9 +451,7 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem if (componentMainThreadExecutor != null) { componentMainThreadExecutor.schedule( - this::checkIdleSlotTimeout, - idleSlotTimeout.toMilliseconds(), - TimeUnit.MILLISECONDS); + this::checkIdleSlotTimeout, idleSlotTimeout.toMillis(), TimeUnit.MILLISECONDS); } } @@ -492,7 +487,7 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem for (PendingRequest unfulfillableRequest : unfulfillableRequests) { unfulfillableRequest.markUnfulfillable(currentTimestamp); - if (unfulfillableRequest.getUnfulfillableSince() + batchSlotTimeout.toMilliseconds() + if (unfulfillableRequest.getUnfulfillableSince() + batchSlotTimeout.toMillis() <= currentTimestamp) { timeoutPendingSlotRequest(unfulfillableRequest.getSlotRequestId()); } @@ -502,7 +497,7 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem if (componentMainThreadExecutor != null) { componentMainThreadExecutor.schedule( this::checkBatchSlotTimeout, - batchSlotTimeout.toMilliseconds(), + batchSlotTimeout.toMillis(), TimeUnit.MILLISECONDS); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java index 339f494272e..279a2d58df3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeServiceFactory.java @@ -19,11 +19,12 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.util.clock.Clock; import javax.annotation.Nonnull; +import java.time.Duration; + /** Factory for {@link DeclarativeSlotPoolBridge}. */ public class DeclarativeSlotPoolBridgeServiceFactory extends AbstractSlotPoolServiceFactory { @@ -31,9 +32,9 @@ public class DeclarativeSlotPoolBridgeServiceFactory extends AbstractSlotPoolSer public DeclarativeSlotPoolBridgeServiceFactory( @Nonnull Clock clock, - @Nonnull Time rpcTimeout, - @Nonnull Time slotIdleTimeout, - @Nonnull Time batchSlotTimeout, + @Nonnull Duration rpcTimeout, + @Nonnull Duration slotIdleTimeout, + @Nonnull Duration batchSlotTimeout, @Nonnull RequestSlotMatchingStrategy requestSlotMatchingStrategy) { super(clock, rpcTimeout, slotIdleTimeout, batchSlotTimeout); this.requestSlotMatchingStrategy = requestSlotMatchingStrategy; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java index 77cb9964221..6630254552c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolFactory.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.slots.ResourceRequirement; +import java.time.Duration; import java.util.Collection; import java.util.function.Consumer; @@ -30,6 +30,6 @@ public interface DeclarativeSlotPoolFactory { DeclarativeSlotPool create( JobID jobId, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, - Time idleSlotTimeout, - Time rpcTimeout); + Duration idleSlotTimeout, + Duration rpcTimeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java index 6088d8b45f7..9f00b8ff0bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -56,7 +57,7 @@ public class DeclarativeSlotPoolService implements SlotPoolService { private final JobID jobId; - private final Time rpcTimeout; + private final Duration rpcTimeout; private final DeclarativeSlotPool declarativeSlotPool; @@ -80,8 +81,8 @@ public class DeclarativeSlotPoolService implements SlotPoolService { JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, - Time idleSlotTimeout, - Time rpcTimeout) { + Duration idleSlotTimeout, + Duration rpcTimeout) { this.jobId = jobId; this.clock = clock; this.rpcTimeout = rpcTimeout; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java index af8e3d4d801..0680a9587c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceFactory.java @@ -19,19 +19,21 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.util.clock.Clock; import javax.annotation.Nonnull; +import java.time.Duration; + /** Factory for the {@link DeclarativeSlotPoolService}. */ public class DeclarativeSlotPoolServiceFactory implements SlotPoolServiceFactory { private final Clock clock; - private final Time idleSlotTimeout; - private final Time rpcTimeout; + private final Duration idleSlotTimeout; + private final Duration rpcTimeout; - public DeclarativeSlotPoolServiceFactory(Clock clock, Time idleSlotTimeout, Time rpcTimeout) { + public DeclarativeSlotPoolServiceFactory( + Clock clock, Duration idleSlotTimeout, Duration rpcTimeout) { this.clock = clock; this.idleSlotTimeout = idleSlotTimeout; this.rpcTimeout = rpcTimeout; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java index ebec6f34264..70061b67fa9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -87,8 +88,8 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { private final Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements; - private final Time idleSlotTimeout; - private final Time rpcTimeout; + private final Duration idleSlotTimeout; + private final Duration rpcTimeout; private final JobID jobId; protected final AllocatedSlotPool slotPool; @@ -107,8 +108,8 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { JobID jobId, AllocatedSlotPool slotPool, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, - Time idleSlotTimeout, - Time rpcTimeout) { + Duration idleSlotTimeout, + Duration rpcTimeout) { this.jobId = jobId; this.slotPool = slotPool; @@ -499,7 +500,7 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { while (!excessResources.isEmpty() && freeSlotIterator.hasNext()) { final AllocatedSlotPool.FreeSlotInfo idleSlot = freeSlotIterator.next(); - if (currentTimeMillis >= idleSlot.getFreeSince() + idleSlotTimeout.toMilliseconds()) { + if (currentTimeMillis >= idleSlot.getFreeSince() + idleSlotTimeout.toMillis()) { final ResourceProfile matchingProfile = getMatchingResourceProfile(idleSlot.getAllocationId()); @@ -546,7 +547,10 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { final CompletableFuture<Acknowledge> freeSlotFuture = slotToReturn .getTaskManagerGateway() - .freeSlot(slotToReturn.getAllocationId(), cause, rpcTimeout); + .freeSlot( + slotToReturn.getAllocationId(), + cause, + Time.fromDuration(rpcTimeout)); freeSlotFuture.whenComplete( (Acknowledge ignored, Throwable throwable) -> { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java index 581aba4a346..95655b1fe3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolFactory.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.slots.ResourceRequirement; +import java.time.Duration; import java.util.Collection; import java.util.function.Consumer; @@ -32,8 +32,8 @@ public class DefaultDeclarativeSlotPoolFactory implements DeclarativeSlotPoolFac public DeclarativeSlotPool create( JobID jobId, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, - Time idleSlotTimeout, - Time rpcTimeout) { + Duration idleSlotTimeout, + Duration rpcTimeout) { return new DefaultDeclarativeSlotPool( jobId, new DefaultAllocatedSlotPool(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 177b045449e..c5ab1545fb7 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -572,7 +572,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> @Override public CompletableFuture<Acknowledge> declareRequiredResources( - JobMasterId jobMasterId, ResourceRequirements resourceRequirements, Time timeout) { + JobMasterId jobMasterId, ResourceRequirements resourceRequirements, Duration timeout) { final JobID jobId = resourceRequirements.getJobId(); try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) { final JobManagerRegistration jobManagerRegistration = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 6adde0f9e23..275bc6299e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -88,7 +88,7 @@ public interface ResourceManagerGateway CompletableFuture<Acknowledge> declareRequiredResources( JobMasterId jobMasterId, ResourceRequirements resourceRequirements, - @RpcTimeout Time timeout); + @RpcTimeout Duration timeout); /** * Register a {@link TaskExecutor} at the resource manager. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java index af5354ddbb7..f0f7e6b719f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -33,6 +32,7 @@ import org.apache.flink.runtime.util.ResourceCounter; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -305,8 +305,8 @@ class BlocklistDeclarativeSlotPoolTest { new DefaultAllocatedSlotPool(), ignored -> {}, blockedTaskManagerChecker, - Time.seconds(20), - Time.seconds(20)); + Duration.ofSeconds(20), + Duration.ofSeconds(20)); } public static BlocklistDeclarativeSlotPoolBuilder builder() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java index 31264fe448e..f3a31c4d552 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmaster.JobMasterId; @@ -31,15 +30,15 @@ import org.apache.flink.util.clock.SystemClock; import javax.annotation.Nullable; +import java.time.Duration; import java.util.concurrent.CompletableFuture; /** Builder for a {@link DeclarativeSlotPoolBridge}. */ public class DeclarativeSlotPoolBridgeBuilder { private JobID jobId = new JobID(); - private Time batchSlotTimeout = - Time.fromDuration(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue()); - private Time idleSlotTimeout = TestingUtils.infiniteTime(); + private Duration batchSlotTimeout = JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue(); + private Duration idleSlotTimeout = TestingUtils.infiniteTime().toDuration(); private Clock clock = SystemClock.getInstance(); @Nullable @@ -54,12 +53,12 @@ public class DeclarativeSlotPoolBridgeBuilder { return this; } - public DeclarativeSlotPoolBridgeBuilder setBatchSlotTimeout(Time batchSlotTimeout) { + public DeclarativeSlotPoolBridgeBuilder setBatchSlotTimeout(Duration batchSlotTimeout) { this.batchSlotTimeout = batchSlotTimeout; return this; } - public DeclarativeSlotPoolBridgeBuilder setIdleSlotTimeout(Time idleSlotTimeout) { + public DeclarativeSlotPoolBridgeBuilder setIdleSlotTimeout(Duration idleSlotTimeout) { this.idleSlotTimeout = idleSlotTimeout; return this; } @@ -85,7 +84,7 @@ public class DeclarativeSlotPoolBridgeBuilder { jobId, new DefaultDeclarativeSlotPoolFactory(), clock, - TestingUtils.infiniteTime(), + TestingUtils.infiniteTime().toDuration(), idleSlotTimeout, batchSlotTimeout, requestSlotMatchingStrategy); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java index 1d33f299c5c..90ee4b2fb8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java @@ -51,9 +51,9 @@ class DeclarativeSlotPoolBridgePreferredAllocationsTest { new JobID(), new DefaultDeclarativeSlotPoolFactory(), SystemClock.getInstance(), - TestingUtils.infiniteTime(), - TestingUtils.infiniteTime(), - TestingUtils.infiniteTime(), + TestingUtils.infiniteTime().toDuration(), + TestingUtils.infiniteTime().toDuration(), + TestingUtils.infiniteTime().toDuration(), PreferredAllocationRequestSlotMatchingStrategy.INSTANCE); declarativeSlotPoolBridge.start( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java index c28932edfc8..2d97dc3b58b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java @@ -62,7 +62,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; @ExtendWith(ParameterizedTestExtension.class) class DeclarativeSlotPoolBridgeTest { - private static final Time rpcTimeout = Time.seconds(20); + private static final Duration rpcTimeout = Duration.ofSeconds(20); private static final JobID jobId = new JobID(); private static final JobMasterId jobMasterId = JobMasterId.generate(); private final ComponentMainThreadExecutor mainThreadExecutor = @@ -186,7 +186,7 @@ class DeclarativeSlotPoolBridgeTest { declarativeSlotPoolBridge.requestNewAllocatedSlot( slotRequestId, ResourceProfile.UNKNOWN, - rpcTimeout); + Time.fromDuration(rpcTimeout)); slotFuture.whenComplete( (physicalSlot, throwable) -> { if (throwable != null) { @@ -216,7 +216,9 @@ class DeclarativeSlotPoolBridgeTest { final CompletableFuture<PhysicalSlot> slotFuture = declarativeSlotPoolBridge.requestNewAllocatedSlot( - new SlotRequestId(), ResourceProfile.UNKNOWN, rpcTimeout); + new SlotRequestId(), + ResourceProfile.UNKNOWN, + Time.fromDuration(rpcTimeout)); final LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation(); @@ -278,8 +280,8 @@ class DeclarativeSlotPoolBridgeTest { declarativeSlotPoolFactory, SystemClock.getInstance(), rpcTimeout, - Time.seconds(20), - Time.seconds(20), + Duration.ofSeconds(20), + Duration.ofSeconds(20), requestSlotMatchingStrategy); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java index 91b16a24ddb..91d11eb5d04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -47,6 +46,7 @@ import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; +import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -359,8 +359,8 @@ class DeclarativeSlotPoolServiceTest { jobId, declarativeSlotPoolFactory, SystemClock.getInstance(), - Time.seconds(20L), - Time.seconds(20L)); + Duration.ofSeconds(20L), + Duration.ofSeconds(20L)); declarativeSlotPoolService.start(jobMasterId, address, mainThreadExecutor); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java index f1a4d0933f3..ea177ff6f85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolBuilder.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.slots.ResourceRequirement; +import java.time.Duration; import java.util.Collection; import java.util.function.Consumer; @@ -30,8 +30,8 @@ final class DefaultDeclarativeSlotPoolBuilder { private AllocatedSlotPool allocatedSlotPool = new DefaultAllocatedSlotPool(); private Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements = ignored -> {}; - private Time idleSlotTimeout = Time.seconds(20); - private Time rpcTimeout = Time.seconds(20); + private Duration idleSlotTimeout = Duration.ofSeconds(20); + private Duration rpcTimeout = Duration.ofSeconds(20); public DefaultDeclarativeSlotPoolBuilder setAllocatedSlotPool( AllocatedSlotPool allocatedSlotPool) { @@ -45,7 +45,7 @@ final class DefaultDeclarativeSlotPoolBuilder { return this; } - public DefaultDeclarativeSlotPoolBuilder setIdleSlotTimeout(Time idleSlotTimeout) { + public DefaultDeclarativeSlotPoolBuilder setIdleSlotTimeout(Duration idleSlotTimeout) { this.idleSlotTimeout = idleSlotTimeout; return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java index 895423304c7..0244cbadf18 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobmaster.slotpool; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmaster.SlotInfo; @@ -41,6 +40,7 @@ import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -419,7 +419,7 @@ class DefaultDeclarativeSlotPoolTest { @Test void testReturnIdleSlotsAfterTimeout() { - final Time idleSlotTimeout = Time.seconds(10); + final Duration idleSlotTimeout = Duration.ofSeconds(10); final long offerTime = 0; final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder() @@ -443,7 +443,7 @@ class DefaultDeclarativeSlotPoolTest { // decrease the resource requirements so that slots are no longer needed slotPool.decreaseResourceRequirementsBy(resourceRequirements); - slotPool.releaseIdleSlots(offerTime + idleSlotTimeout.toMilliseconds()); + slotPool.releaseIdleSlots(offerTime + idleSlotTimeout.toMillis()); final Collection<AllocationID> freedSlots = freeSlotConsumer.drainFreedSlots(); @@ -463,7 +463,7 @@ class DefaultDeclarativeSlotPoolTest { @Test void testOnlyReturnExcessIdleSlots() { - final Time idleSlotTimeout = Time.seconds(10); + final Duration idleSlotTimeout = Duration.ofSeconds(10); final long offerTime = 0; final DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder() @@ -483,7 +483,7 @@ class DefaultDeclarativeSlotPoolTest { final ResourceCounter excessRequirements = resourceRequirements.subtract(requiredResources); slotPool.decreaseResourceRequirementsBy(excessRequirements); - slotPool.releaseIdleSlots(offerTime + idleSlotTimeout.toMilliseconds()); + slotPool.releaseIdleSlots(offerTime + idleSlotTimeout.toMillis()); assertThat(acceptedSlots).isNotEmpty(); assertThat(slotPool.getFulfilledResourceRequirements()).isEqualTo(requiredResources); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java new file mode 100644 index 00000000000..e69de29bb2d diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java index 138b3809221..208e71aa5fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java @@ -237,7 +237,7 @@ class SlotPoolBatchSlotRequestTest { return new DeclarativeSlotPoolBridgeBuilder() .setResourceManagerGateway(resourceManagerGateway) - .setBatchSlotTimeout(batchSlotTimeout) + .setBatchSlotTimeout(batchSlotTimeout.toDuration()) .buildAndStart(componentMainThreadExecutor); } @@ -250,7 +250,7 @@ class SlotPoolBatchSlotRequestTest { return new DeclarativeSlotPoolBridgeBuilder() .setResourceManagerGateway(resourceManagerGateway) - .setBatchSlotTimeout(batchSlotTimeout) + .setBatchSlotTimeout(batchSlotTimeout.toDuration()) .setClock(clock) .buildAndStart(componentMainThreadExecutor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java index 5e989850168..3ca17396654 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.slots.ResourceRequirement; +import java.time.Duration; import java.util.Collection; import java.util.function.Consumer; @@ -38,8 +38,8 @@ final class TestingDeclarativeSlotPoolFactory implements DeclarativeSlotPoolFact public TestingDeclarativeSlotPool create( JobID jobId, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, - Time idleSlotTimeout, - Time rpcTimeout) { + Duration idleSlotTimeout, + Duration rpcTimeout) { return builder.build(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index a006a147358..d3bf93dfd02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -313,7 +313,7 @@ class ResourceManagerTest { jobMasterGateway.getAddress(), Collections.singleton( ResourceRequirement.create(ResourceProfile.UNKNOWN, 1))), - TIMEOUT) + TIMEOUT.toDuration()) .get(); resourceManagerGateway.disconnectJobManager( @@ -372,7 +372,7 @@ class ResourceManagerTest { jobMasterGateway.getAddress(), Collections.singleton( ResourceRequirement.create(ResourceProfile.UNKNOWN, 1))), - TIMEOUT); + TIMEOUT.toDuration()); resourceManager .runInMainThread( () -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index 3c7e3991e83..8f7a697879a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -297,7 +297,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { @Override public CompletableFuture<Acknowledge> declareRequiredResources( - JobMasterId jobMasterId, ResourceRequirements resourceRequirements, Time timeout) { + JobMasterId jobMasterId, ResourceRequirements resourceRequirements, Duration timeout) { return declareRequiredResourcesFunction.apply(jobMasterId, resourceRequirements); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java index 7e8331d8ebb..f7a22ac3b65 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java @@ -185,7 +185,7 @@ class DefaultSchedulerBatchSchedulingTest { ComponentMainThreadExecutor mainThreadExecutor, Time batchSlotTimeout) throws Exception { return new DeclarativeSlotPoolBridgeBuilder() - .setBatchSlotTimeout(batchSlotTimeout) + .setBatchSlotTimeout(batchSlotTimeout.toDuration()) .buildAndStart(mainThreadExecutor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index d68087da908..531d757948b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -115,6 +115,7 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -1595,7 +1596,7 @@ public class DefaultSchedulerTest { ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( scheduledExecutorService); - final Time slotTimeout = Time.milliseconds(5L); + final Duration slotTimeout = Duration.ofMillis(5L); final SlotPool slotPool = new DeclarativeSlotPoolBridgeBuilder() .setBatchSlotTimeout(slotTimeout) @@ -1613,7 +1614,7 @@ public class DefaultSchedulerTest { .addJob(new JobID(), "jobName")) .setExecutionSlotAllocatorFactory( SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory( - slotProvider, slotTimeout)) + slotProvider, Time.fromDuration(slotTimeout))) .build(); final AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway taskManagerGateway = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java index 42f50616490..b3207436a47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java @@ -52,6 +52,7 @@ import org.apache.flink.util.FatalExitExceptionHandler; import javax.annotation.Nullable; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.concurrent.ScheduledExecutorService; @@ -60,7 +61,7 @@ import java.util.function.Function; /** Builder for {@link AdaptiveScheduler}. */ public class AdaptiveSchedulerBuilder { - private static final Time DEFAULT_TIMEOUT = Time.seconds(300); + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300); private final JobGraph jobGraph; @@ -75,7 +76,7 @@ public class AdaptiveSchedulerBuilder { private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory(); private DeclarativeSlotPool declarativeSlotPool; - private Time rpcTimeout = DEFAULT_TIMEOUT; + private Duration rpcTimeout = DEFAULT_TIMEOUT; private BlobWriter blobWriter = VoidBlobWriter.getInstance(); private JobManagerJobMetricGroup jobManagerJobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(); @@ -157,7 +158,7 @@ public class AdaptiveSchedulerBuilder { } public AdaptiveSchedulerBuilder setRpcTimeout(final Time rpcTimeout) { - this.rpcTimeout = rpcTimeout; + this.rpcTimeout = rpcTimeout.toDuration(); return this; } @@ -246,7 +247,7 @@ public class AdaptiveSchedulerBuilder { new DefaultExecutionDeploymentTracker(), executorService, executorService, - rpcTimeout, + Time.fromDuration(rpcTimeout), jobManagerJobMetricGroup, blobWriter, shuffleMaster, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index d1bde31da6c..f6e7246687b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MetricOptions; @@ -536,8 +535,8 @@ public class AdaptiveSchedulerTest { jobGraph.getJobID(), new DefaultAllocatedSlotPool(), ignored -> {}, - Time.minutes(10), - Time.minutes(10)); + Duration.ofMinutes(10), + Duration.ofMinutes(10)); final Configuration configuration = createConfigurationWithNoTimeouts(); configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1); @@ -2312,8 +2311,8 @@ public class AdaptiveSchedulerTest { jobId, new DefaultAllocatedSlotPool(), ignored -> {}, - Time.fromDuration(idleSlotTimeout), - Time.fromDuration(DEFAULT_TIMEOUT)); + idleSlotTimeout, + DEFAULT_TIMEOUT); } private static JobGraph createJobGraph() {
