tillrohrmann commented on a change in pull request #15812:
URL: https://github.com/apache/flink/pull/15812#discussion_r634118669
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
##########
@@ -144,9 +144,9 @@ void start(
Collection<SlotInfo> getAllocatedSlotsInformation();
/**
- * Allocates the available slot with the given allocation id under the
given request id for the
- * given requirement profile. The slot must be able to fulfill the
requirement profile,
- * otherwise an {@link IllegalStateException} will be thrown.
+ * QueryableStateClient.java Allocates the available slot with the given
allocation id under the
Review comment:
Why did you insert the `QueryableStateClient.java` here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclareResourceRequirementServiceConnectionManager.java
##########
@@ -87,7 +87,9 @@ private void triggerResourceRequirementsSubmission(
() -> sendResourceRequirements(resourceRequirementsToSend),
new ExponentialBackoffRetryStrategy(
Integer.MAX_VALUE, sleepOnError, maxSleepOnError),
- throwable -> !(throwable instanceof CancellationException),
+ throwable ->
+ !(throwable instanceof CancellationException)
+ && !(throwable instanceof
UnsupportedOperationException),
Review comment:
Why is this change necessary? This does not look right to me.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
##########
@@ -48,19 +48,21 @@
@Test
public void testSlotAllocationFulfilledWithWorkloadSpreadOut()
throws InterruptedException, ExecutionException {
+ PhysicalSlotRequest request0 =
physicalSlotProviderResource.createSimpleRequest();
+ PhysicalSlotRequest request1 =
physicalSlotProviderResource.createSimpleRequest();
+
+ CompletableFuture<PhysicalSlotRequest.Result> resultCompletableFuture0
=
+ physicalSlotProviderResource.allocateSlot(request0);
+ CompletableFuture<PhysicalSlotRequest.Result> resultCompletableFuture1
=
+ physicalSlotProviderResource.allocateSlot(request1);
+
physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(
ResourceProfile.ANY, ResourceProfile.ANY, ResourceProfile.ANY,
ResourceProfile.ANY);
physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(
ResourceProfile.ANY, ResourceProfile.ANY, ResourceProfile.ANY,
ResourceProfile.ANY);
- PhysicalSlotRequest request0 =
physicalSlotProviderResource.createSimpleRequest();
- PhysicalSlotRequest request1 =
physicalSlotProviderResource.createSimpleRequest();
-
- PhysicalSlotRequest.Result result0 =
- physicalSlotProviderResource.allocateSlot(request0).get();
- PhysicalSlotRequest.Result result1 =
- physicalSlotProviderResource.allocateSlot(request1).get();
-
+ PhysicalSlotRequest.Result result0 = resultCompletableFuture0.get();
+ PhysicalSlotRequest.Result result1 = resultCompletableFuture1.get();
Review comment:
Why are these changes necessary?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -178,34 +159,34 @@ public void testExtraSlotsAreKept() throws Exception {
assertTrue(ExceptionUtils.stripExecutionException(e)
instanceof TimeoutException);
}
- // wait until we have timed out the slot request
- slotRequestTimeoutFuture.get();
-
- assertEquals(0L, pool.getNumberOfPendingRequests());
+ testMainThreadExecutor.execute(
+ () ->
assertTrue(pool.getAllocatedSlotsInformation().isEmpty()));
Review comment:
Same here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java
##########
@@ -50,7 +48,7 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
-/** Tests how the {@link SlotPoolImpl} completes slot requests. */
+/** Tests how the {@link SlotPool} completes slot requests. */
public class SlotPoolRequestCompletionTest extends TestLogger {
Review comment:
```suggestion
public class DeclarativeSlotPoolBridgeRequestCompletionTest extends
TestLogger {
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
##########
@@ -272,8 +272,8 @@ public void testFailingExecutionAfterRestart() throws
Exception {
/**
* Tests that a graph is not restarted after cancellation via a call to
{@link
- * ExecutionGraph#failGlobal(Throwable)}. This can happen when a slot is
released concurrently
- * with cancellation.
+ * ExecutionGraph#failJob(Throwable, long)} . This can happen when a slot
is released
+ * concurrently with cancellation.
Review comment:
I think this test does neither call `ExecutionGraph.failGlobal` nor
`ExecutionGraph.failJob`. Hence, I would suggest to say "Tests that a graph is
not restarted after cancellation via a call to {@link
Execution#fail(Throwable)}".
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -54,9 +54,8 @@ private SlotPoolUtils() {
throw new UnsupportedOperationException("Cannot instantiate this
class.");
}
- static TestingSlotPoolImpl createAndSetUpSlotPool(
+ static SlotPool createAndSetUpSlotPool(
Review comment:
```suggestion
static DeclarativeSlotPoolBridge createAndSetUpDeclarativeSlotPoolBridge(
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -178,34 +159,34 @@ public void testExtraSlotsAreKept() throws Exception {
assertTrue(ExceptionUtils.stripExecutionException(e)
instanceof TimeoutException);
}
- // wait until we have timed out the slot request
- slotRequestTimeoutFuture.get();
-
- assertEquals(0L, pool.getNumberOfPendingRequests());
+ testMainThreadExecutor.execute(
+ () ->
assertTrue(pool.getAllocatedSlotsInformation().isEmpty()));
- AllocationID allocationId = allocationIdFuture.get();
- final SlotOffer slotOffer = new SlotOffer(allocationId, 0,
ResourceProfile.ANY);
+ final SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0,
ResourceProfile.ANY);
final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
final TaskManagerGateway taskManagerGateway = new
SimpleAckingTaskManagerGateway();
testMainThreadExecutor.execute(
() ->
pool.registerTaskManager(taskManagerLocation.getResourceID()));
-
assertTrue(
testMainThreadExecutor.execute(
() ->
- pool.offerSlot(
- taskManagerLocation,
taskManagerGateway, slotOffer)));
+ pool.offerSlots(
+ taskManagerLocation,
+ taskManagerGateway,
+
Lists.newArrayList(slotOffer))
+ != null));
- assertTrue(pool.containsAvailableSlot(allocationId));
+ testMainThreadExecutor.execute(
+ () ->
assertTrue(pool.getAvailableSlotsInformation().isEmpty()));
}
}
- private TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception {
+ private SlotPool createAndSetUpSlotPool() throws Exception {
return new
SlotPoolBuilder(testMainThreadExecutor.getMainThreadExecutor()).build();
}
- private TestingSlotPoolImpl createAndSetUpSlotPoolWithoutResourceManager()
throws Exception {
+ private SlotPool createAndSetUpSlotPoolWithoutResourceManager() throws
Exception {
Review comment:
I think the test class should be renamed in to
`DeclarativeSlotPoolBridgeInteractionsTest`. Also please rename this method
into `createAndSetUpDeclarativeSlotPoolBridgeWithoutResourceManager`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
##########
@@ -105,16 +107,4 @@ public void testSlotAllocationFulfilledWithNewSlots()
physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(ResourceProfile.ANY);
slotFuture.get();
}
-
- @Test
- public void
testIndividualBatchSlotRequestTimeoutCheckIsDisabledOnAllocatingNewSlots()
- throws Exception {
- TestingSlotPoolImpl slotPool =
- new
SlotPoolBuilder(physicalSlotProviderResource.getMainThreadExecutor()).build();
- assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true));
-
- new PhysicalSlotProviderImpl(
-
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut(), slotPool);
- assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(),
is(false));
- }
Review comment:
Why can this test be removed?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
##########
@@ -90,6 +87,11 @@ public void
testSlotAllocationFulfilledWithPreferredInputOverwrittingSpreadOut()
PhysicalSlotRequest.Result result1 =
physicalSlotProviderResource.allocateSlot(request1).get();
+ physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(
+ ResourceProfile.ANY, ResourceProfile.ANY);
+ physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(
+ ResourceProfile.ANY, ResourceProfile.ANY);
+
Review comment:
Same here, why did you move this code block around?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBuilder.java
##########
@@ -33,7 +33,7 @@
import java.util.concurrent.CompletableFuture;
-/** Builder for a {@link TestingSlotPoolImpl}. */
+/** Builder for a {@link DeclarativeSlotPool}. */
public class SlotPoolBuilder {
Review comment:
```suggestion
public class DeclarativeSlotPoolBridgeBuilder {
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -105,22 +102,16 @@ public void
testCancelSlotAllocationWithoutResourceManager() throws Exception {
assertTrue(ExceptionUtils.stripExecutionException(e)
instanceof TimeoutException);
}
- // wait for the timeout of the pending slot request
- timeoutFuture.get();
-
- assertEquals(0L, pool.getNumberOfWaitingForResourceRequests());
+ testMainThreadExecutor.execute(
+ () ->
assertTrue(pool.getAllocatedSlotsInformation().isEmpty()));
Review comment:
Why is this the same test as the previous version? In the old test, we
test that the pending slot requests are cleared. Here we are testing that there
is no allocated slot. This is not the same.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -137,32 +128,22 @@ public void testSlotAllocationTimeout() throws Exception {
assertTrue(ExceptionUtils.stripExecutionException(e)
instanceof TimeoutException);
}
- // wait until we have timed out the slot request
- slotRequestTimeoutFuture.get();
-
- assertEquals(0L, pool.getNumberOfPendingRequests());
+ testMainThreadExecutor.execute(
+ () ->
assertTrue(pool.getAllocatedSlotsInformation().isEmpty()));
}
}
- /** Tests that extra slots are kept by the {@link SlotPoolImpl}. */
+ /** Tests that extra slots are kept by the {@link SlotPool}. */
@Test
public void testExtraSlotsAreKept() throws Exception {
Review comment:
I think this test is no longer valid for the
`DeclarativeSlotPoolBridge`. Instead, the test should make sure that extra
slots are rejected.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
##########
@@ -61,16 +61,4 @@ public void testSlotAllocationFulfilledWithNewSlots()
physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(ResourceProfile.ANY);
slotFuture.get();
}
-
- @Test
- public void
testIndividualBatchSlotRequestTimeoutCheckIsDisabledOnAllocatingNewSlots()
- throws Exception {
- TestingSlotPoolImpl slotPool =
- new
SlotPoolBuilder(physicalSlotProviderResource.getMainThreadExecutor()).build();
- assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true));
-
- new PhysicalSlotProviderImpl(
- LocationPreferenceSlotSelectionStrategy.createDefault(),
slotPool);
- assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(),
is(false));
- }
Review comment:
Why is it ok to delete this test instead of using the
`DeclarativeSlotPoolBridge` here?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -137,32 +128,22 @@ public void testSlotAllocationTimeout() throws Exception {
assertTrue(ExceptionUtils.stripExecutionException(e)
instanceof TimeoutException);
}
- // wait until we have timed out the slot request
- slotRequestTimeoutFuture.get();
-
- assertEquals(0L, pool.getNumberOfPendingRequests());
+ testMainThreadExecutor.execute(
+ () ->
assertTrue(pool.getAllocatedSlotsInformation().isEmpty()));
Review comment:
Same here. Why is this the same test?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -68,114 +52,6 @@ public void setup() {
resourceManagerGateway = new TestingResourceManagerGateway();
}
Review comment:
This test class also needs to be renamed in to
`DeclarativeSlotPoolBridgePendingRequestFailureTest`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]