1996fanrui commented on code in PR #25134:
URL: https://github.com/apache/flink/pull/25134#discussion_r1703679953
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.java:
##########
@@ -205,12 +224,36 @@ void testRequirementsDecreasedOnSlotAllocationFailure()
throws Exception {
.isZero();
}
- private static final class RequirementListener {
+ /** Requirement listener for testing. */
+ public static final class RequirementListener {
Review Comment:
```suggestion
static final class RequirementListener {
```
Default is enough.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimplePhysicalSlot.java:
##########
@@ -21,12 +21,12 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
-/** Simple implementation of the {@link SlotContext} interface for the legacy
code. */
-public class SimpleSlotContext implements SlotContext {
+/** Simple implementation of the {@link PhysicalSlot} interface for the legacy
code. */
+public class SimplePhysicalSlot implements PhysicalSlot {
Review Comment:
After this PR, `SimplePhysicalSlot` and `TestingPhysicalSlot` are similar
(TestingPhysicalSlot has Payload payload related logic than
`SimplePhysicalSlot`.)
I'm curious is it possible to remove `SimplePhysicalSlot`, and all callers
use `TestingPhysicalSlot`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -548,4 +610,13 @@ void increaseResourceRequirementsBy(ResourceCounter
increment) {
boolean isBatchSlotRequestTimeoutCheckEnabled() {
return !isBatchSlotRequestTimeoutCheckDisabled;
}
+
+ @VisibleForTesting
+ public void tryWaitSlotRequestIsDone() {
Review Comment:
```suggestion
void tryWaitSlotRequestIsDone() {
```
public isn't needed.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -548,4 +610,13 @@ void increaseResourceRequirementsBy(ResourceCounter
increment) {
boolean isBatchSlotRequestTimeoutCheckEnabled() {
return !isBatchSlotRequestTimeoutCheckDisabled;
}
+
+ @VisibleForTesting
+ public void tryWaitSlotRequestIsDone() {
+ if (getDeclarativeSlotPool() instanceof DefaultDeclarativeSlotPool) {
+ final DefaultDeclarativeSlotPool slotPool =
+ (DefaultDeclarativeSlotPool) getDeclarativeSlotPool();
+ slotPool.tryWaitSlotRequestIsDone();
+ }
+ }
Review Comment:
If last comment makes sense, could we move ` void
tryWaitSlotRequestIsDone()` method to `DeclarativeSlotPoolBridgeTest`?
Only `DeclarativeSlotPoolBridgeTest` calls `tryWaitSlotRequestIsDone()`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##########
@@ -234,6 +290,11 @@ void newSlotsAreAvailable(Collection<? extends
PhysicalSlot> newSlots) {
}
}
+ @VisibleForTesting
+ Collection<PhysicalSlot> getFreePhysicalSlots() {
Review Comment:
```suggestion
Collection<PhysicalSlot> getFreeSlotsInformation() {
```
nit: how about keeping it's same with interface?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingFreeSlotTracker.java:
##########
@@ -151,23 +151,23 @@ public Builder
setReserveSlotConsumer(Consumer<AllocationID> reserveSlotConsumer
return this;
}
- public Builder
setCreateNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction(
- Function<Set<AllocationID>, FreeSlotInfoTracker>
-
createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction) {
- this.createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction =
- createNewFreeSlotInfoTrackerWithoutBlockedSlotsFunction;
+ public Builder setCreateNewFreeSlotTrackerWithoutBlockedSlotsFunction(
Review Comment:
It seems this method is not called.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.java:
##########
@@ -55,19 +58,32 @@ void testSlotOffer() throws Exception {
final PhysicalSlot allocatedSlot =
createAllocatedSlot(expectedAllocationId);
final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
- new
TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
+ new TestingDeclarativeSlotPoolFactory(
Review Comment:
As I understand, the `slotBatchAllocatable` is the core feature of this PR.
But I didn't find any test from `DeclarativeSlotPoolBridgeTest` cover this
case, for example: Request 2 slots in total.
- When slotBatchAllocatable is disabled
- When the first slot is ready: we assign it directly
- When the second slot is ready: we assign it directly
- When slotBatchAllocatable is enabled
- When the first slot is ready: we don't assign it
- When the second slot is ready: we assign 2 slots together
Please correct my if I miss something, thanks
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java:
##########
@@ -51,37 +52,53 @@ abstract class AbstractDeclarativeSlotPoolBridgeTest {
@Parameter(1)
protected Duration slotRequestMaxInterval;
- @Parameters(name = "requestSlotMatchingStrategy: {0},
slotRequestMaxInterval: {1}")
+ @Parameter(2)
+ boolean slotBatchAllocatable;
+
+ @Parameters(
+ name =
+ "requestSlotMatchingStrategy: {0}, slotRequestMaxInterval:
{1}, slotBatchAllocatable: {2}")
private static Collection<Object[]> data() {
return Arrays.asList(
- new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE,
Duration.ZERO},
- new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE,
Duration.ofMillis(50)},
+ new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE,
Duration.ZERO, false},
+ new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE,
Duration.ZERO, true},
+ new Object[] {
+ SimpleRequestSlotMatchingStrategy.INSTANCE,
Duration.ofMillis(20), false
+ },
+ new Object[] {
+ SimpleRequestSlotMatchingStrategy.INSTANCE,
Duration.ofMillis(20), true
+ },
+ new Object[] {
+ PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
Duration.ZERO, false
+ },
new Object[] {
- PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
Duration.ZERO
+ PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
Duration.ZERO, true
},
new Object[] {
- PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
Duration.ofMillis(50)
+ PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
+ Duration.ofMillis(20),
+ false
+ },
+ new Object[] {
+ PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
+ Duration.ofMillis(20),
+ true
});
}
@Nonnull
DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
- DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
- RequestSlotMatchingStrategy requestSlotMatchingStrategy,
- Duration slotRequestMaxInterval) {
+ DeclarativeSlotPoolFactory declarativeSlotPoolFactory) {
return createDeclarativeSlotPoolBridge(
- declarativeSlotPoolFactory,
- requestSlotMatchingStrategy,
- slotRequestMaxInterval,
- componentMainThreadExecutor);
+ declarativeSlotPoolFactory, componentMainThreadExecutor, null);
}
@Nonnull
DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
- RequestSlotMatchingStrategy requestSlotMatchingStrategy,
- Duration slotRequestMaxInterval,
- ComponentMainThreadExecutor mainThreadExecutor) {
+ ComponentMainThreadExecutor componentMainThreadExecutor,
+
DeclarativeSlotPoolBridgeResourceDeclarationTest.RequirementListener
Review Comment:
Is it possible that we don't pass `RequirementListener` here?
Alternative solution is : All tests of
DeclarativeSlotPoolBridgeResourceDeclarationTest could call
`requirementListener#tryWaitSlotRequestIsDone` directly.
In general, the parent class doesn't depends on the class inside of children
class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]