This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 82b628d4730 [hotfix][scheduler] Migrate the Time to Duration for
SlotPool in the minimum scope
82b628d4730 is described below
commit 82b628d4730eef32b2f7a022e3b73cb18f950e6e
Author: Roc Marshal <[email protected]>
AuthorDate: Fri Jul 26 20:23:24 2024 +0800
[hotfix][scheduler] Migrate the Time to Duration for SlotPool in the
minimum scope
---
.../runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java | 9 ++++-----
.../org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java | 8 +++++---
.../java/org/apache/flink/runtime/jobmaster/JobMasterTest.java | 2 +-
.../jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java | 6 +++---
.../DeclarativeSlotPoolBridgePreferredAllocationsTest.java | 6 +++---
.../slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java | 4 ++--
.../DeclarativeSlotPoolBridgeResourceDeclarationTest.java | 5 ++---
.../jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java | 9 +++------
.../runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java | 4 ++--
.../apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java | 4 ++--
10 files changed, 27 insertions(+), 30 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index b04dc25f615..38a9e1312f5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -282,7 +281,7 @@ public class DeclarativeSlotPoolBridge extends
DeclarativeSlotPoolService implem
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection<AllocationID> preferredAllocations,
- @Nullable Time timeout) {
+ @Nullable Duration timeout) {
assertRunningInMainThread();
log.debug(
@@ -318,7 +317,7 @@ public class DeclarativeSlotPoolBridge extends
DeclarativeSlotPoolService implem
}
private CompletableFuture<PhysicalSlot> internalRequestNewSlot(
- PendingRequest pendingRequest, @Nullable Time timeout) {
+ PendingRequest pendingRequest, @Nullable Duration timeout) {
internalRequestNewAllocatedSlot(pendingRequest);
if (timeout == null) {
@@ -326,12 +325,12 @@ public class DeclarativeSlotPoolBridge extends
DeclarativeSlotPoolService implem
} else {
return FutureUtils.orTimeout(
pendingRequest.getSlotFuture(),
- timeout.toMilliseconds(),
+ timeout.toMillis(),
TimeUnit.MILLISECONDS,
componentMainThreadExecutor,
String.format(
"Pending slot request %s timed out after
%d ms.",
- pendingRequest.getSlotRequestId(),
timeout.toMilliseconds()))
+ pendingRequest.getSlotRequestId(),
timeout.toMillis()))
.whenComplete(
(physicalSlot, throwable) -> {
if (throwable instanceof TimeoutException) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index e32ab5dc42a..1b5cba7fcf8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.jobmaster.slotpool;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -33,6 +32,7 @@ import
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
@@ -154,7 +154,9 @@ public interface SlotPool extends AllocatedSlotActions,
AutoCloseable {
* @return a newly allocated slot that was previously not available.
*/
default CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
- SlotRequestId slotRequestId, ResourceProfile resourceProfile,
@Nullable Time timeout) {
+ SlotRequestId slotRequestId,
+ ResourceProfile resourceProfile,
+ @Nullable Duration timeout) {
return requestNewAllocatedSlot(
slotRequestId, resourceProfile, Collections.emptyList(),
timeout);
}
@@ -175,7 +177,7 @@ public interface SlotPool extends AllocatedSlotActions,
AutoCloseable {
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
Collection<AllocationID> preferredAllocations,
- @Nullable Time timeout);
+ @Nullable Duration timeout);
/**
* Requests the allocation of a new batch slot from the resource manager.
Unlike the normal
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 55e252ad42a..4e724fb9cb0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -610,7 +610,7 @@ class JobMasterTest {
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection<AllocationID> preferredAllocations,
- @Nullable Time timeout) {
+ @Nullable Duration timeout) {
return new CompletableFuture<>();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
index 6b267dafd6c..9855ba8be84 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java
@@ -41,7 +41,7 @@ public class DeclarativeSlotPoolBridgeBuilder {
private JobID jobId = new JobID();
private Duration batchSlotTimeout =
JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue();
- private Duration idleSlotTimeout =
TestingUtils.infiniteTime().toDuration();
+ private Duration idleSlotTimeout = TestingUtils.infiniteDuration();
private Clock clock = SystemClock.getInstance();
private Duration slotRequestMaxInterval =
SLOT_REQUEST_MAX_INTERVAL.defaultValue();
private ComponentMainThreadExecutor mainThreadExecutor = forMainThread();
@@ -101,7 +101,7 @@ public class DeclarativeSlotPoolBridgeBuilder {
jobId,
new DefaultDeclarativeSlotPoolFactory(),
clock,
- TestingUtils.infiniteTime().toDuration(),
+ TestingUtils.infiniteDuration(),
idleSlotTimeout,
batchSlotTimeout,
requestSlotMatchingStrategy,
@@ -115,7 +115,7 @@ public class DeclarativeSlotPoolBridgeBuilder {
jobId,
new DefaultDeclarativeSlotPoolFactory(),
clock,
- TestingUtils.infiniteTime().toDuration(),
+ TestingUtils.infiniteDuration(),
idleSlotTimeout,
batchSlotTimeout,
requestSlotMatchingStrategy,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
index af691d21477..ace4cf756c8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgePreferredAllocationsTest.java
@@ -52,9 +52,9 @@ class DeclarativeSlotPoolBridgePreferredAllocationsTest {
new JobID(),
new DefaultDeclarativeSlotPoolFactory(),
SystemClock.getInstance(),
- TestingUtils.infiniteTime().toDuration(),
- TestingUtils.infiniteTime().toDuration(),
- TestingUtils.infiniteTime().toDuration(),
+ TestingUtils.infiniteDuration(),
+ TestingUtils.infiniteDuration(),
+ TestingUtils.infiniteDuration(),
PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
Duration.ZERO,
forMainThread());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
index 147ef4a9885..71909ca5aac 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeRequestCompletionTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.jobmaster.slotpool;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -32,6 +31,7 @@ import org.apache.flink.util.function.CheckedSupplier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -46,7 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests how the {@link DeclarativeSlotPoolBridge} completes slot requests. */
class DeclarativeSlotPoolBridgeRequestCompletionTest {
- private static final Time TIMEOUT = SlotPoolUtils.TIMEOUT;
+ private static final Duration TIMEOUT = SlotPoolUtils.TIMEOUT;
private TestingResourceManagerGateway resourceManagerGateway;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
index 7aa18057536..d830a6649e9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.jobmaster.slotpool;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -97,7 +96,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
// requesting the allocation of a new slot should increase the
requirements
declarativeSlotPoolBridge.requestNewAllocatedSlot(
- new SlotRequestId(), ResourceProfile.UNKNOWN, Time.minutes(5));
+ new SlotRequestId(), ResourceProfile.UNKNOWN,
Duration.ofMinutes(5));
assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN))
.isOne();
}
@@ -120,7 +119,7 @@ class DeclarativeSlotPoolBridgeResourceDeclarationTest
declarativeSlotPoolBridge.requestNewAllocatedSlot(
new SlotRequestId(),
ResourceProfile.UNKNOWN,
- Time.milliseconds(5)),
+ Duration.ofMillis(5)),
mainThreadExecutor)
.get();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
index 28c91a46248..ee261fa76aa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.jobmaster.slotpool;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -95,7 +94,7 @@ class DeclarativeSlotPoolBridgeTest extends
AbstractDeclarativeSlotPoolBridgeTes
declarativeSlotPoolBridge.requestNewAllocatedSlot(
slotRequestId,
ResourceProfile.UNKNOWN,
- Time.minutes(5)),
+ Duration.ofMinutes(5)),
componentMainThreadExecutor)
.get();
@@ -167,7 +166,7 @@ class DeclarativeSlotPoolBridgeTest extends
AbstractDeclarativeSlotPoolBridgeTes
declarativeSlotPoolBridge.requestNewAllocatedSlot(
slotRequestId,
ResourceProfile.UNKNOWN,
-
Time.fromDuration(RPC_TIMEOUT));
+ RPC_TIMEOUT);
slotFuture.whenComplete(
(physicalSlot, throwable) -> {
if (throwable != null) {
@@ -199,9 +198,7 @@ class DeclarativeSlotPoolBridgeTest extends
AbstractDeclarativeSlotPoolBridgeTes
final CompletableFuture<PhysicalSlot> slotFuture =
declarativeSlotPoolBridge.requestNewAllocatedSlot(
- new SlotRequestId(),
- ResourceProfile.UNKNOWN,
- Time.fromDuration(RPC_TIMEOUT));
+ new SlotRequestId(), ResourceProfile.UNKNOWN,
RPC_TIMEOUT);
final LocalTaskManagerLocation localTaskManagerLocation =
new LocalTaskManagerLocation();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
index a000bd251f5..1bea667531c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.jobmaster.slotpool;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import
org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
@@ -27,6 +26,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@@ -36,7 +36,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link DeclarativeSlotPoolBridge} interactions. */
class SlotPoolInteractionsTest {
- private static final Time fastTimeout = Time.milliseconds(1L);
+ private static final Duration fastTimeout = Duration.ofMillis(1L);
@RegisterExtension
private static final TestingComponentMainThreadExecutor.Extension
EXECUTOR_EXTENSION =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
index f430beeb10c..25956dca779 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.jobmaster.slotpool;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -33,6 +32,7 @@ import
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -48,7 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Testing utility functions for the {@link SlotPool}. */
public class SlotPoolUtils {
- public static final Time TIMEOUT = Time.seconds(10L);
+ public static final Duration TIMEOUT = Duration.ofSeconds(10L);
private SlotPoolUtils() {
throw new UnsupportedOperationException("Cannot instantiate this
class.");