zhuzhurk commented on a change in pull request #12917:
URL: https://github.com/apache/flink/pull/12917#discussion_r459635923



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -245,27 +226,25 @@ public void testAllocateWithFreeSlot() throws Exception {
 
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, slotOffer));
 
-                       LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+                       PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
                        assertTrue(future1.isDone());
 
                        // return this slot to pool
-                       slot1.releaseSlot();
+                       slotPool.releaseSlot(requestId1, null);

Review comment:
       Looks to me there is not need to have the process to allocate slot1, 
offer slot and release it.
   The slot offering only would be enough to add a free slot.
   I think we can simplify it, maybe in a separate commit.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -123,17 +119,11 @@ public void testAllocateSimpleSlot() throws Exception {
                CompletableFuture<SlotRequest> slotRequestFuture = new 
CompletableFuture<>();
                
resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
 
-               try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
-                       setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
-                       Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);
+               try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
                        
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
 
                        SlotRequestId requestId = new SlotRequestId();
-                       CompletableFuture<LogicalSlot> future = 
scheduler.allocateSlot(
-                               requestId,
-                               new DummyScheduledUnit(),
-                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
-                               timeout);
+                       CompletableFuture<PhysicalSlot> future = 
requestNewAllocatedSlot(slotPool, requestId);

Review comment:
       can be final

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -145,10 +135,10 @@ public void testAllocateSimpleSlot() throws Exception {
 
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, slotOffer));
 
-                       LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
+                       PhysicalSlot physicalSlot = future.get(1, 
TimeUnit.SECONDS);

Review comment:
       can be final

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -275,16 +254,14 @@ public void testOfferSlot() throws Exception {
 
                
resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
 
-               try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
-                       setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
-                       Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);
+               try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
                        
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
 
-                       CompletableFuture<LogicalSlot> future = 
scheduler.allocateSlot(
-                               new SlotRequestId(),
-                               new DummyScheduledUnit(),
-                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
-                               timeout);
+                       SlotRequestId requestId = new SlotRequestId();
+                       CompletableFuture<PhysicalSlot> future = 
requestNewAllocatedSlot(
+                               slotPool,
+                               requestId
+                       );

Review comment:
       ```suggestion
                        CompletableFuture<PhysicalSlot> future = 
requestNewAllocatedSlot(
                                slotPool,
                                SlotRequestId()
                        );
   ```
   requestId is not reused

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -893,14 +866,6 @@ protected boolean matchesSafely(AllocatedSlotInfo item, 
Description mismatchDesc
                };
        }
 
-       private CompletableFuture<LogicalSlot> allocateSlot(Scheduler 
scheduler, SlotRequestId slotRequestId) {
-               return scheduler.allocateSlot(
-                       slotRequestId,
-                       new DummyScheduledUnit(),
-                       SlotProfile.noRequirements(),
-                       timeout);
-       }
-
        private SlotPoolImpl createAndSetUpSlotPool() throws Exception {

Review comment:
       Let's move `createSlotPoolImpl()` to be below this method because it is 
first (and only) used here.
   Also let's remove the `@Nonnull` tag of it because it is against current 
code style.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -666,6 +639,8 @@ public void testFreeFailedSlots() throws Exception {
                        final Optional<ResourceID> emptyTaskExecutorFuture = 
slotPool.failAllocation(
                                slotOffer.getAllocationId(),
                                failException);
+
+                       assertTrue(emptyTaskExecutorFuture.isPresent());

Review comment:
       this change is unrelated and not necessary.
   Let's exclude it from this commit.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -704,7 +679,7 @@ public void testCreateAllocatedSlotReport() throws 
Exception {
                        slotPool.offerSlots(taskManagerLocation, 
taskManagerGateway, slotOffers);
 
                        // wait for the completion of slot future
-                       slotRequestFuture.get();
+                       slotRequestFuture.get(1, TimeUnit.SECONDS);

Review comment:
       why making this change?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -245,27 +226,25 @@ public void testAllocateWithFreeSlot() throws Exception {
 
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, slotOffer));
 
-                       LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+                       PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
                        assertTrue(future1.isDone());
 
                        // return this slot to pool
-                       slot1.releaseSlot();
+                       slotPool.releaseSlot(requestId1, null);
 
-                       CompletableFuture<LogicalSlot> future2 = 
scheduler.allocateSlot(
+                       assertEquals(1, slotPool.getAvailableSlots().size());
+                       assertEquals(0, slotPool.getAllocatedSlots().size());
+
+                       Optional<PhysicalSlot> optional = 
slotPool.allocateAvailableSlot(

Review comment:
       name it as physicalSlot/allocatedSlot would be better 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -309,12 +286,16 @@ public void testOfferSlot() throws Exception {
 
                        // accepted slot
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, slotOffer));
-                       LogicalSlot slot = future.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
-                       assertTrue(slot.isAlive());
+                       PhysicalSlot slot = 
future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+                       assertEquals(1, slotPool.getAvailableSlots().size());
+                       assertEquals(1, slotPool.getAllocatedSlots().size());
+                       assertEquals(taskManagerLocation, 
slot.getTaskManagerLocation());
+                       assertEquals(nonRequestedSlotOffer.getAllocationId(), 
slot.getAllocationId());

Review comment:
       these lines, except for "assertEquals(1, 
slotPool.getAvailableSlots().size());",  should be in the section of "// we'll 
also accept non requested slots".

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -309,12 +286,16 @@ public void testOfferSlot() throws Exception {
 
                        // accepted slot
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, slotOffer));
-                       LogicalSlot slot = future.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
-                       assertTrue(slot.isAlive());
+                       PhysicalSlot slot = 
future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+                       assertEquals(1, slotPool.getAvailableSlots().size());
+                       assertEquals(1, slotPool.getAllocatedSlots().size());
+                       assertEquals(taskManagerLocation, 
slot.getTaskManagerLocation());
+                       assertEquals(nonRequestedSlotOffer.getAllocationId(), 
slot.getAllocationId());
 
                        // duplicated offer with using slot
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, slotOffer));
-                       assertTrue(slot.isAlive());
+                       assertEquals(1, slotPool.getAllocatedSlots().size());

Review comment:
       Better to verify the availableSlots as well.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -224,16 +207,14 @@ public void testAllocateWithFreeSlot() throws Exception {
                final CompletableFuture<SlotRequest> slotRequestFuture = new 
CompletableFuture<>();
                
resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
 
-               try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
-                       setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
-                       Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);
+               try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
                        
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
 
-                       CompletableFuture<LogicalSlot> future1 = 
scheduler.allocateSlot(
-                               new SlotRequestId(),
-                               new DummyScheduledUnit(),
-                               SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
-                               timeout);
+                       SlotRequestId requestId1 = new SlotRequestId();
+                       CompletableFuture<PhysicalSlot> future1 = 
requestNewAllocatedSlot(
+                               slotPool,
+                               requestId1
+                       );

Review comment:
       According to Flink code style, the right bracket should be placed in the 
line of the last parameter.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -576,7 +550,6 @@ public void testDiscardIdleSlotIfReleasingFailed() throws 
Exception {
                try (TestingSlotPoolImpl slotPool = createSlotPoolImpl(clock)) {
 
                        setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);

Review comment:
       Why not replace it with `createAndSetUpSlotPool `?
   Also for the test case `testCheckIdleSlot`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -309,12 +286,16 @@ public void testOfferSlot() throws Exception {
 
                        // accepted slot
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, slotOffer));
-                       LogicalSlot slot = future.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
-                       assertTrue(slot.isAlive());
+                       PhysicalSlot slot = 
future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+                       assertEquals(1, slotPool.getAvailableSlots().size());
+                       assertEquals(1, slotPool.getAllocatedSlots().size());
+                       assertEquals(taskManagerLocation, 
slot.getTaskManagerLocation());
+                       assertEquals(nonRequestedSlotOffer.getAllocationId(), 
slot.getAllocationId());
 
                        // duplicated offer with using slot
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, slotOffer));
-                       assertTrue(slot.isAlive());
+                       assertEquals(1, slotPool.getAllocatedSlots().size());
+                       assertEquals(nonRequestedSlotOffer.getAllocationId(), 
slot.getAllocationId());

Review comment:
       This line is not needed because the allocation id of an AllocatedSlot is 
immutable.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -365,19 +343,26 @@ public void testReleaseResource() throws Exception {
 
                        assertTrue(slotPool.offerSlot(taskManagerLocation, 
taskManagerGateway, slotOffer));
 
-                       LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+                       PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
                        assertTrue(future1.isDone());
                        assertFalse(future2.isDone());
 
                        final CompletableFuture<?> releaseFuture = new 
CompletableFuture<>();
-                       final DummyPayload dummyPayload = new 
DummyPayload(releaseFuture);
 
-                       slot1.tryAssignPayload(dummyPayload);
+                       SingleLogicalSlot logicalSlot = 
SingleLogicalSlot.allocateFromPhysicalSlot(
+                               requestId1,
+                               slot1,
+                               Locality.UNKNOWN,
+                               new DummySlotOwner(),
+                               true
+                       );
+
+                       logicalSlot.tryAssignPayload(new 
DummyPayload(releaseFuture));
 
                        
slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), null);
 
-                       releaseFuture.get();
-                       assertFalse(slot1.isAlive());
+                       releaseFuture.get(1, TimeUnit.SECONDS);

Review comment:
       does `releaseFuture.get();` not work?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to