Thesharing commented on a change in pull request #12917:
URL: https://github.com/apache/flink/pull/12917#discussion_r465519831
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java
##########
@@ -129,22 +126,15 @@ private void runSlotRequestCompletionTest(
}
}
- private SlotPoolImpl setUpSlotPoolAndConnectToResourceManager() throws
Exception {
- final SlotPoolImpl slotPool = setUpSlotPool();
- connectToResourceManager(slotPool);
-
- return slotPool;
+ private TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception {
+ return
SlotPoolUtils.createAndSetUpSlotPool(resourceManagerGateway);
}
private void connectToResourceManager(SlotPoolImpl slotPool) {
slotPool.connectToResourceManager(resourceManagerGateway);
}
- private SlotPoolImpl setUpSlotPool() throws Exception {
- final SlotPoolImpl slotPool = new TestingSlotPoolImpl(new
JobID());
-
- slotPool.start(JobMasterId.generate(), "foobar",
ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
- return slotPool;
+ private TestingSlotPoolImpl
createAndSetUpSlotPoolWithoutResourceManager() throws Exception {
+ return
SlotPoolUtils.createSlotPoolBuilder().setResourceManagerGateway(null).build();
Review comment:
Done.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
##########
@@ -276,4 +289,28 @@ private void
advanceTimeAndTriggerCheckBatchSlotTimeout(TestingSlotPoolImpl slot
// timeout all as unfulfillable marked slots
slotPool.triggerCheckBatchSlotTimeout();
}
+
+ private TestingSlotPoolImpl createAndSetUpSlotPool(
+ final ComponentMainThreadExecutor
componentMainThreadExecutor,
+ final ResourceManagerGateway resourceManagerGateway,
+ final Time batchSlotTimeout) throws Exception {
+
+ return SlotPoolUtils
Review comment:
I don't agree on this. If we rebase this `createAndSetUpSlotPool` with
the one below, we have to introduce a default value of `Clock`, which is
unnecessary.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -45,19 +50,108 @@
*/
public class SlotPoolUtils {
+ public static final Time TIMEOUT = Time.seconds(10L);
+
private SlotPoolUtils() {
throw new UnsupportedOperationException("Cannot instantiate
this class.");
}
+ public static SlotPoolBuilder createSlotPoolBuilder() {
+ return
createSlotPoolBuilder(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(final
ComponentMainThreadExecutor mainThreadExecutor) {
+ return new SlotPoolBuilder(mainThreadExecutor);
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(final
ResourceManagerGateway resourceManagerGateway) {
+ return
createSlotPoolBuilder().setResourceManagerGateway(resourceManagerGateway);
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final ResourceManagerGateway resourceManagerGateway) {
+
+ return
createSlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+ }
+
+ public static TestingSlotPoolImpl createAndSetUpSlotPool(
Review comment:
Agree. Done.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -45,19 +50,108 @@
*/
public class SlotPoolUtils {
+ public static final Time TIMEOUT = Time.seconds(10L);
+
private SlotPoolUtils() {
throw new UnsupportedOperationException("Cannot instantiate
this class.");
}
+ public static SlotPoolBuilder createSlotPoolBuilder() {
+ return
createSlotPoolBuilder(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(final
ComponentMainThreadExecutor mainThreadExecutor) {
+ return new SlotPoolBuilder(mainThreadExecutor);
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(final
ResourceManagerGateway resourceManagerGateway) {
+ return
createSlotPoolBuilder().setResourceManagerGateway(resourceManagerGateway);
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final ResourceManagerGateway resourceManagerGateway) {
+
+ return
createSlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+ }
+
+ public static TestingSlotPoolImpl createAndSetUpSlotPool(
+ final ResourceManagerGateway resourceManagerGateway)
throws Exception {
+
+ return createSlotPoolBuilder(resourceManagerGateway).build();
+ }
+
+ public static TestingSlotPoolImpl createAndSetUpSlotPool(
+ final ResourceManagerGateway resourceManagerGateway,
+ final JobID jobId) throws Exception {
+
+ return
createSlotPoolBuilder(resourceManagerGateway).setJobId(jobId).build();
+ }
+
+ public static TestingSlotPoolImpl createAndSetUpSlotPool(
+ final ResourceManagerGateway resourceManagerGateway,
+ final Clock clock,
+ final Time idleSlotTimeout) throws Exception {
+
+ return SlotPoolUtils
+ .createSlotPoolBuilder(resourceManagerGateway)
+ .setClock(clock)
+ .setIdleSlotTimeout(idleSlotTimeout)
+ .build();
+ }
+
+ public static TestingSlotPoolImpl createAndSetUpSlotPool(
Review comment:
Removed.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -45,19 +50,108 @@
*/
public class SlotPoolUtils {
+ public static final Time TIMEOUT = Time.seconds(10L);
+
private SlotPoolUtils() {
throw new UnsupportedOperationException("Cannot instantiate
this class.");
}
+ public static SlotPoolBuilder createSlotPoolBuilder() {
+ return
createSlotPoolBuilder(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(final
ComponentMainThreadExecutor mainThreadExecutor) {
+ return new SlotPoolBuilder(mainThreadExecutor);
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(final
ResourceManagerGateway resourceManagerGateway) {
+ return
createSlotPoolBuilder().setResourceManagerGateway(resourceManagerGateway);
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final ResourceManagerGateway resourceManagerGateway) {
+
+ return
createSlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+ }
+
+ public static TestingSlotPoolImpl createAndSetUpSlotPool(
+ final ResourceManagerGateway resourceManagerGateway)
throws Exception {
+
+ return createSlotPoolBuilder(resourceManagerGateway).build();
+ }
+
+ public static TestingSlotPoolImpl createAndSetUpSlotPool(
Review comment:
Moved.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
##########
@@ -233,16 +239,23 @@ public void
testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exceptio
final ManualClock clock = new ManualClock();
final Time batchSlotTimeout = Time.milliseconds(1000L);
- try (final TestingSlotPoolImpl slotPool = new
SlotPoolBuilder(directMainThreadExecutor)
- .setClock(clock)
- .setBatchSlotTimeout(batchSlotTimeout)
- .build()) {
+ try (final TestingSlotPoolImpl slotPool =
createAndSetUpSlotPool(
+ directMainThreadExecutor,
+ null,
+ batchSlotTimeout,
+ clock)) {
+
final ResourceID taskManagerResourceId =
SlotPoolUtils.offerSlots(slotPool, directMainThreadExecutor,
Collections.singletonList(resourceProfile));
- final CompletableFuture<PhysicalSlot> firstSlotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor,
resourceProfile);
Review comment:
Removed.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -80,24 +65,12 @@
@Test
public void testSlotAllocationNoResourceManager() throws Exception {
- final JobID jid = new JobID();
-
- try (SlotPool pool = new SlotPoolImpl(
- jid,
- SystemClock.getInstance(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime()
- )) {
- pool.start(JobMasterId.generate(), "foobar",
testMainThreadExecutor.getMainThreadExecutor());
- Scheduler scheduler = new
SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), pool);
-
scheduler.start(testMainThreadExecutor.getMainThreadExecutor());
+ try (SlotPool pool = createAndSetUpSlotPool(false)) {
Review comment:
Done.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -45,19 +50,108 @@
*/
public class SlotPoolUtils {
+ public static final Time TIMEOUT = Time.seconds(10L);
+
private SlotPoolUtils() {
throw new UnsupportedOperationException("Cannot instantiate
this class.");
}
+ public static SlotPoolBuilder createSlotPoolBuilder() {
Review comment:
I've removed these `createSlotPoolBuilder` functions.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -45,19 +50,108 @@
*/
public class SlotPoolUtils {
+ public static final Time TIMEOUT = Time.seconds(10L);
+
private SlotPoolUtils() {
throw new UnsupportedOperationException("Cannot instantiate
this class.");
}
+ public static SlotPoolBuilder createSlotPoolBuilder() {
+ return
createSlotPoolBuilder(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(final
ComponentMainThreadExecutor mainThreadExecutor) {
+ return new SlotPoolBuilder(mainThreadExecutor);
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(final
ResourceManagerGateway resourceManagerGateway) {
+ return
createSlotPoolBuilder().setResourceManagerGateway(resourceManagerGateway);
+ }
+
+ public static SlotPoolBuilder createSlotPoolBuilder(
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final ResourceManagerGateway resourceManagerGateway) {
+
+ return
createSlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+ }
+
+ public static TestingSlotPoolImpl createAndSetUpSlotPool(
+ final ResourceManagerGateway resourceManagerGateway)
throws Exception {
+
+ return createSlotPoolBuilder(resourceManagerGateway).build();
+ }
+
+ public static TestingSlotPoolImpl createAndSetUpSlotPool(
+ final ResourceManagerGateway resourceManagerGateway,
+ final JobID jobId) throws Exception {
+
+ return
createSlotPoolBuilder(resourceManagerGateway).setJobId(jobId).build();
+ }
+
+ public static TestingSlotPoolImpl createAndSetUpSlotPool(
Review comment:
Moved.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]