Thesharing commented on a change in pull request #12917:
URL: https://github.com/apache/flink/pull/12917#discussion_r465519831



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java
##########
@@ -129,22 +126,15 @@ private void runSlotRequestCompletionTest(
                }
        }
 
-       private SlotPoolImpl setUpSlotPoolAndConnectToResourceManager() throws 
Exception {
-               final SlotPoolImpl slotPool = setUpSlotPool();
-               connectToResourceManager(slotPool);
-
-               return slotPool;
+       private TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception {
+               return 
SlotPoolUtils.createAndSetUpSlotPool(resourceManagerGateway);
        }
 
        private void connectToResourceManager(SlotPoolImpl slotPool) {
                slotPool.connectToResourceManager(resourceManagerGateway);
        }
 
-       private SlotPoolImpl setUpSlotPool() throws Exception {
-               final SlotPoolImpl slotPool = new TestingSlotPoolImpl(new 
JobID());
-
-               slotPool.start(JobMasterId.generate(), "foobar", 
ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
-               return slotPool;
+       private TestingSlotPoolImpl 
createAndSetUpSlotPoolWithoutResourceManager() throws Exception {
+               return 
SlotPoolUtils.createSlotPoolBuilder().setResourceManagerGateway(null).build();

Review comment:
       Done.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
##########
@@ -276,4 +289,28 @@ private void 
advanceTimeAndTriggerCheckBatchSlotTimeout(TestingSlotPoolImpl slot
                // timeout all as unfulfillable marked slots
                slotPool.triggerCheckBatchSlotTimeout();
        }
+
+       private TestingSlotPoolImpl createAndSetUpSlotPool(
+                       final ComponentMainThreadExecutor 
componentMainThreadExecutor,
+                       final ResourceManagerGateway resourceManagerGateway,
+                       final Time batchSlotTimeout) throws Exception {
+
+               return SlotPoolUtils

Review comment:
       I don't agree on this. If we rebase this `createAndSetUpSlotPool` with 
the one below, we have to introduce a default value of `Clock`, which is 
unnecessary.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -45,19 +50,108 @@
  */
 public class SlotPoolUtils {
 
+       public static final Time TIMEOUT = Time.seconds(10L);
+
        private SlotPoolUtils() {
                throw new UnsupportedOperationException("Cannot instantiate 
this class.");
        }
 
+       public static SlotPoolBuilder createSlotPoolBuilder() {
+               return 
createSlotPoolBuilder(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(final 
ComponentMainThreadExecutor mainThreadExecutor) {
+               return new SlotPoolBuilder(mainThreadExecutor);
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(final 
ResourceManagerGateway resourceManagerGateway) {
+               return 
createSlotPoolBuilder().setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(
+                       final ComponentMainThreadExecutor mainThreadExecutor,
+                       final ResourceManagerGateway resourceManagerGateway) {
+
+               return 
createSlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       public static TestingSlotPoolImpl createAndSetUpSlotPool(

Review comment:
       Agree. Done.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -45,19 +50,108 @@
  */
 public class SlotPoolUtils {
 
+       public static final Time TIMEOUT = Time.seconds(10L);
+
        private SlotPoolUtils() {
                throw new UnsupportedOperationException("Cannot instantiate 
this class.");
        }
 
+       public static SlotPoolBuilder createSlotPoolBuilder() {
+               return 
createSlotPoolBuilder(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(final 
ComponentMainThreadExecutor mainThreadExecutor) {
+               return new SlotPoolBuilder(mainThreadExecutor);
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(final 
ResourceManagerGateway resourceManagerGateway) {
+               return 
createSlotPoolBuilder().setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(
+                       final ComponentMainThreadExecutor mainThreadExecutor,
+                       final ResourceManagerGateway resourceManagerGateway) {
+
+               return 
createSlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       public static TestingSlotPoolImpl createAndSetUpSlotPool(
+                       final ResourceManagerGateway resourceManagerGateway) 
throws Exception {
+
+               return createSlotPoolBuilder(resourceManagerGateway).build();
+       }
+
+       public static TestingSlotPoolImpl createAndSetUpSlotPool(
+                       final ResourceManagerGateway resourceManagerGateway,
+                       final JobID jobId) throws Exception {
+
+               return 
createSlotPoolBuilder(resourceManagerGateway).setJobId(jobId).build();
+       }
+
+       public static TestingSlotPoolImpl createAndSetUpSlotPool(
+                       final ResourceManagerGateway resourceManagerGateway,
+                       final Clock clock,
+                       final Time idleSlotTimeout) throws Exception {
+
+               return SlotPoolUtils
+                       .createSlotPoolBuilder(resourceManagerGateway)
+                       .setClock(clock)
+                       .setIdleSlotTimeout(idleSlotTimeout)
+                       .build();
+       }
+
+       public static TestingSlotPoolImpl createAndSetUpSlotPool(

Review comment:
       Removed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -45,19 +50,108 @@
  */
 public class SlotPoolUtils {
 
+       public static final Time TIMEOUT = Time.seconds(10L);
+
        private SlotPoolUtils() {
                throw new UnsupportedOperationException("Cannot instantiate 
this class.");
        }
 
+       public static SlotPoolBuilder createSlotPoolBuilder() {
+               return 
createSlotPoolBuilder(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(final 
ComponentMainThreadExecutor mainThreadExecutor) {
+               return new SlotPoolBuilder(mainThreadExecutor);
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(final 
ResourceManagerGateway resourceManagerGateway) {
+               return 
createSlotPoolBuilder().setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(
+                       final ComponentMainThreadExecutor mainThreadExecutor,
+                       final ResourceManagerGateway resourceManagerGateway) {
+
+               return 
createSlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       public static TestingSlotPoolImpl createAndSetUpSlotPool(
+                       final ResourceManagerGateway resourceManagerGateway) 
throws Exception {
+
+               return createSlotPoolBuilder(resourceManagerGateway).build();
+       }
+
+       public static TestingSlotPoolImpl createAndSetUpSlotPool(

Review comment:
       Moved.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
##########
@@ -233,16 +239,23 @@ public void 
testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exceptio
                final ManualClock clock = new ManualClock();
                final Time batchSlotTimeout = Time.milliseconds(1000L);
 
-               try (final TestingSlotPoolImpl slotPool = new 
SlotPoolBuilder(directMainThreadExecutor)
-                               .setClock(clock)
-                               .setBatchSlotTimeout(batchSlotTimeout)
-                               .build()) {
+               try (final TestingSlotPoolImpl slotPool = 
createAndSetUpSlotPool(
+                               directMainThreadExecutor,
+                               null,
+                               batchSlotTimeout,
+                               clock)) {
+
                        final ResourceID taskManagerResourceId = 
SlotPoolUtils.offerSlots(slotPool, directMainThreadExecutor, 
Collections.singletonList(resourceProfile));
-                       final CompletableFuture<PhysicalSlot> firstSlotFuture = 
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, 
resourceProfile);

Review comment:
       Removed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -80,24 +65,12 @@
 
        @Test
        public void testSlotAllocationNoResourceManager() throws Exception {
-               final JobID jid = new JobID();
-
-               try (SlotPool pool = new SlotPoolImpl(
-                       jid,
-                       SystemClock.getInstance(),
-                       TestingUtils.infiniteTime(),
-                       TestingUtils.infiniteTime(),
-                       TestingUtils.infiniteTime()
-               )) {
 
-                       pool.start(JobMasterId.generate(), "foobar", 
testMainThreadExecutor.getMainThreadExecutor());
-                       Scheduler scheduler = new 
SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), pool);
-                       
scheduler.start(testMainThreadExecutor.getMainThreadExecutor());
+               try (SlotPool pool = createAndSetUpSlotPool(false)) {

Review comment:
       Done.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -45,19 +50,108 @@
  */
 public class SlotPoolUtils {
 
+       public static final Time TIMEOUT = Time.seconds(10L);
+
        private SlotPoolUtils() {
                throw new UnsupportedOperationException("Cannot instantiate 
this class.");
        }
 
+       public static SlotPoolBuilder createSlotPoolBuilder() {

Review comment:
       I've removed these `createSlotPoolBuilder` functions.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -45,19 +50,108 @@
  */
 public class SlotPoolUtils {
 
+       public static final Time TIMEOUT = Time.seconds(10L);
+
        private SlotPoolUtils() {
                throw new UnsupportedOperationException("Cannot instantiate 
this class.");
        }
 
+       public static SlotPoolBuilder createSlotPoolBuilder() {
+               return 
createSlotPoolBuilder(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(final 
ComponentMainThreadExecutor mainThreadExecutor) {
+               return new SlotPoolBuilder(mainThreadExecutor);
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(final 
ResourceManagerGateway resourceManagerGateway) {
+               return 
createSlotPoolBuilder().setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       public static SlotPoolBuilder createSlotPoolBuilder(
+                       final ComponentMainThreadExecutor mainThreadExecutor,
+                       final ResourceManagerGateway resourceManagerGateway) {
+
+               return 
createSlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
+       }
+
+       public static TestingSlotPoolImpl createAndSetUpSlotPool(
+                       final ResourceManagerGateway resourceManagerGateway) 
throws Exception {
+
+               return createSlotPoolBuilder(resourceManagerGateway).build();
+       }
+
+       public static TestingSlotPoolImpl createAndSetUpSlotPool(
+                       final ResourceManagerGateway resourceManagerGateway,
+                       final JobID jobId) throws Exception {
+
+               return 
createSlotPoolBuilder(resourceManagerGateway).setJobId(jobId).build();
+       }
+
+       public static TestingSlotPoolImpl createAndSetUpSlotPool(

Review comment:
       Moved.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to