This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 82b628d4730 [hotfix][scheduler] Migrate the Time to Duration for 
SlotPool in the minimum scope
82b628d4730 is described below

commit 82b628d4730eef32b2f7a022e3b73cb18f950e6e
Author: Roc Marshal <[email protected]>
AuthorDate: Fri Jul 26 20:23:24 2024 +0800

    [hotfix][scheduler] Migrate the Time to Duration for SlotPool in the 
minimum scope
---
 .../runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java    | 9 ++++-----
 .../org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java    | 8 +++++---
 .../java/org/apache/flink/runtime/jobmaster/JobMasterTest.java   | 2 +-
 .../jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java     | 6 +++---
 .../DeclarativeSlotPoolBridgePreferredAllocationsTest.java       | 6 +++---
 .../slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java | 4 ++--
 .../DeclarativeSlotPoolBridgeResourceDeclarationTest.java        | 5 ++---
 .../jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java        | 9 +++------
 .../runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java     | 4 ++--
 .../apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java   | 4 ++--
 10 files changed, 27 insertions(+), 30 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index b04dc25f615..38a9e1312f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -282,7 +281,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
             @Nonnull SlotRequestId slotRequestId,
             @Nonnull ResourceProfile resourceProfile,
             @Nonnull Collection<AllocationID> preferredAllocations,
-            @Nullable Time timeout) {
+            @Nullable Duration timeout) {
         assertRunningInMainThread();
 
         log.debug(
@@ -318,7 +317,7 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
     }
 
     private CompletableFuture<PhysicalSlot> internalRequestNewSlot(
-            PendingRequest pendingRequest, @Nullable Time timeout) {
+            PendingRequest pendingRequest, @Nullable Duration timeout) {
         internalRequestNewAllocatedSlot(pendingRequest);
 
         if (timeout == null) {
@@ -326,12 +325,12 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
         } else {
             return FutureUtils.orTimeout(
                             pendingRequest.getSlotFuture(),
-                            timeout.toMilliseconds(),
+                            timeout.toMillis(),
                             TimeUnit.MILLISECONDS,
                             componentMainThreadExecutor,
                             String.format(
                                     "Pending slot request %s timed out after 
%d ms.",
-                                    pendingRequest.getSlotRequestId(), 
timeout.toMilliseconds()))
+                                    pendingRequest.getSlotRequestId(), 
timeout.toMillis()))
                     .whenComplete(
                             (physicalSlot, throwable) -> {
                                 if (throwable instanceof TimeoutException) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index e32ab5dc42a..1b5cba7fcf8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -33,6 +32,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
@@ -154,7 +154,9 @@ public interface SlotPool extends AllocatedSlotActions, 
AutoCloseable {
      * @return a newly allocated slot that was previously not available.
      */
     default CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
-            SlotRequestId slotRequestId, ResourceProfile resourceProfile, 
@Nullable Time timeout) {
+            SlotRequestId slotRequestId,
+            ResourceProfile resourceProfile,
+            @Nullable Duration timeout) {
         return requestNewAllocatedSlot(
                 slotRequestId, resourceProfile, Collections.emptyList(), 
timeout);
     }
@@ -175,7 +177,7 @@ public interface SlotPool extends AllocatedSlotActions, 
AutoCloseable {
             SlotRequestId slotRequestId,
             ResourceProfile resourceProfile,
             Collection<AllocationID> preferredAllocations,
-            @Nullable Time timeout);
+            @Nullable Duration timeout);
 
     /**
      * Requests the allocation of a new batch slot from the resource manager. 
Unlike the normal
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 55e252ad42a..4e724fb9cb0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -610,7 +610,7 @@ class JobMasterTest {
                 @Nonnull SlotRequestId slotRequestId,
                 @Nonnull ResourceProfile resourceProfile,
                 @Nonnull Collection<AllocationID> preferredAllocations,
-                @Nullable Time timeout) {
+                @Nullable Duration timeout) {
             return new CompletableFuture<>();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
index 6b267dafd6c..9855ba8be84 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
@@ -41,7 +41,7 @@ public class DeclarativeSlotPoolBridgeBuilder {
 
     private JobID jobId = new JobID();
     private Duration batchSlotTimeout = 
JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue();
-    private Duration idleSlotTimeout = 
TestingUtils.infiniteTime().toDuration();
+    private Duration idleSlotTimeout = TestingUtils.infiniteDuration();
     private Clock clock = SystemClock.getInstance();
     private Duration slotRequestMaxInterval = 
SLOT_REQUEST_MAX_INTERVAL.defaultValue();
     private ComponentMainThreadExecutor mainThreadExecutor = forMainThread();
@@ -101,7 +101,7 @@ public class DeclarativeSlotPoolBridgeBuilder {
                 jobId,
                 new DefaultDeclarativeSlotPoolFactory(),
                 clock,
-                TestingUtils.infiniteTime().toDuration(),
+                TestingUtils.infiniteDuration(),
                 idleSlotTimeout,
                 batchSlotTimeout,
                 requestSlotMatchingStrategy,
@@ -115,7 +115,7 @@ public class DeclarativeSlotPoolBridgeBuilder {
                         jobId,
                         new DefaultDeclarativeSlotPoolFactory(),
                         clock,
-                        TestingUtils.infiniteTime().toDuration(),
+                        TestingUtils.infiniteDuration(),
                         idleSlotTimeout,
                         batchSlotTimeout,
                         requestSlotMatchingStrategy,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
index af691d21477..ace4cf756c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
@@ -52,9 +52,9 @@ class DeclarativeSlotPoolBridgePreferredAllocationsTest {
                         new JobID(),
                         new DefaultDeclarativeSlotPoolFactory(),
                         SystemClock.getInstance(),
-                        TestingUtils.infiniteTime().toDuration(),
-                        TestingUtils.infiniteTime().toDuration(),
-                        TestingUtils.infiniteTime().toDuration(),
+                        TestingUtils.infiniteDuration(),
+                        TestingUtils.infiniteDuration(),
+                        TestingUtils.infiniteDuration(),
                         
PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
                         Duration.ZERO,
                         forMainThread());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
index 147ef4a9885..71909ca5aac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -32,6 +31,7 @@ import org.apache.flink.util.function.CheckedSupplier;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -46,7 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests how the {@link DeclarativeSlotPoolBridge} completes slot requests. */
 class DeclarativeSlotPoolBridgeRequestCompletionTest {
 
-    private static final Time TIMEOUT = SlotPoolUtils.TIMEOUT;
+    private static final Duration TIMEOUT = SlotPoolUtils.TIMEOUT;
 
     private TestingResourceManagerGateway resourceManagerGateway;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
index 7aa18057536..d830a6649e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -97,7 +96,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
 
         // requesting the allocation of a new slot should increase the 
requirements
         declarativeSlotPoolBridge.requestNewAllocatedSlot(
-                new SlotRequestId(), ResourceProfile.UNKNOWN, Time.minutes(5));
+                new SlotRequestId(), ResourceProfile.UNKNOWN, 
Duration.ofMinutes(5));
         
assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN))
                 .isOne();
     }
@@ -120,7 +119,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
                                             
declarativeSlotPoolBridge.requestNewAllocatedSlot(
                                                     new SlotRequestId(),
                                                     ResourceProfile.UNKNOWN,
-                                                    Time.milliseconds(5)),
+                                                    Duration.ofMillis(5)),
                                     mainThreadExecutor)
                             .get();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
index 28c91a46248..ee261fa76aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -95,7 +94,7 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
                                             
declarativeSlotPoolBridge.requestNewAllocatedSlot(
                                                     slotRequestId,
                                                     ResourceProfile.UNKNOWN,
-                                                    Time.minutes(5)),
+                                                    Duration.ofMinutes(5)),
                                     componentMainThreadExecutor)
                             .get();
 
@@ -167,7 +166,7 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
                                                 
declarativeSlotPoolBridge.requestNewAllocatedSlot(
                                                         slotRequestId,
                                                         
ResourceProfile.UNKNOWN,
-                                                        
Time.fromDuration(RPC_TIMEOUT));
+                                                        RPC_TIMEOUT);
                                         slotFuture.whenComplete(
                                                 (physicalSlot, throwable) -> {
                                                     if (throwable != null) {
@@ -199,9 +198,7 @@ class DeclarativeSlotPoolBridgeTest extends 
AbstractDeclarativeSlotPoolBridgeTes
 
             final CompletableFuture<PhysicalSlot> slotFuture =
                     declarativeSlotPoolBridge.requestNewAllocatedSlot(
-                            new SlotRequestId(),
-                            ResourceProfile.UNKNOWN,
-                            Time.fromDuration(RPC_TIMEOUT));
+                            new SlotRequestId(), ResourceProfile.UNKNOWN, 
RPC_TIMEOUT);
 
             final LocalTaskManagerLocation localTaskManagerLocation =
                     new LocalTaskManagerLocation();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
index a000bd251f5..1bea667531c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import 
org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
@@ -27,6 +26,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -36,7 +36,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Tests for the {@link DeclarativeSlotPoolBridge} interactions. */
 class SlotPoolInteractionsTest {
 
-    private static final Time fastTimeout = Time.milliseconds(1L);
+    private static final Duration fastTimeout = Duration.ofMillis(1L);
 
     @RegisterExtension
     private static final TestingComponentMainThreadExecutor.Extension 
EXECUTOR_EXTENSION =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
index f430beeb10c..25956dca779 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -33,6 +32,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.FlinkException;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -48,7 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Testing utility functions for the {@link SlotPool}. */
 public class SlotPoolUtils {
 
-    public static final Time TIMEOUT = Time.seconds(10L);
+    public static final Duration TIMEOUT = Duration.ofSeconds(10L);
 
     private SlotPoolUtils() {
         throw new UnsupportedOperationException("Cannot instantiate this 
class.");

Reply via email to