tillrohrmann commented on a change in pull request #13879:
URL: https://github.com/apache/flink/pull/13879#discussion_r536139471
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID
executionVertexId, Throw
ExecutionSlotSharingGroup group = entry.getKey();
List<ExecutionVertexID> executionIds = entry.getValue();
SharedSlot sharedSlot = getOrAllocateSharedSlot(group,
sharedSlotProfileRetriever);
-
- for (ExecutionVertexID executionId : executionIds) {
- CompletableFuture<LogicalSlot>
logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
- SlotExecutionVertexAssignment assignment = new
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
- assignments.put(executionId, assignment);
- }
+ allocateLogicalSlotsFromSharedSlot(assignments,
executionIds, sharedSlot);
}
return assignments;
}
private SharedSlot getOrAllocateSharedSlot(
- ExecutionSlotSharingGroup executionSlotSharingGroup,
+ ExecutionSlotSharingGroup slotSharingGroup,
SharedSlotProfileRetriever sharedSlotProfileRetriever) {
- return sharedSlots
- .computeIfAbsent(executionSlotSharingGroup, group -> {
- SlotRequestId physicalSlotRequestId = new
SlotRequestId();
- ResourceProfile physicalSlotResourceProfile =
getPhysicalSlotResourceProfile(group);
- SlotProfile slotProfile =
sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
- PhysicalSlotRequest physicalSlotRequest =
- new
PhysicalSlotRequest(physicalSlotRequestId, slotProfile,
slotWillBeOccupiedIndefinitely);
- CompletableFuture<PhysicalSlot>
physicalSlotFuture = slotProvider
+ SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+ if (sharedSlot == null) {
+ SlotRequestId physicalSlotRequestId = new
SlotRequestId();
+ ResourceProfile physicalSlotResourceProfile =
+
getPhysicalSlotResourceProfile(slotSharingGroup);
+ SlotProfile slotProfile = sharedSlotProfileRetriever
+ .getSlotProfile(slotSharingGroup,
physicalSlotResourceProfile);
+ PhysicalSlotRequest physicalSlotRequest = new
PhysicalSlotRequest(
+ physicalSlotRequestId,
+ slotProfile,
+ slotWillBeOccupiedIndefinitely);
+ CompletableFuture<PhysicalSlot> physicalSlotFuture =
slotProvider
.allocatePhysicalSlot(physicalSlotRequest)
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
- return new SharedSlot(
+ sharedSlot = new SharedSlot(
physicalSlotRequestId,
physicalSlotResourceProfile,
- group,
+ slotSharingGroup,
physicalSlotFuture,
slotWillBeOccupiedIndefinitely,
this::releaseSharedSlot);
- });
+ if (!physicalSlotFuture.isCompletedExceptionally()) {
+ sharedSlots.put(slotSharingGroup, sharedSlot);
+ }
+ }
+ return sharedSlot;
+ }
+
+ private static void allocateLogicalSlotsFromSharedSlot(
+ Map<ExecutionVertexID, SlotExecutionVertexAssignment>
assignments,
+ Iterable<ExecutionVertexID> executionIds,
+ SharedSlot sharedSlot) {
+ for (ExecutionVertexID executionId : executionIds) {
+ boolean physicalSlotHasAlreadyFailed = sharedSlot
+ .getSlotContextFuture()
+ .isCompletedExceptionally();
Review comment:
we should be able to move this out of the for loop
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID
executionVertexId, Throw
ExecutionSlotSharingGroup group = entry.getKey();
List<ExecutionVertexID> executionIds = entry.getValue();
SharedSlot sharedSlot = getOrAllocateSharedSlot(group,
sharedSlotProfileRetriever);
-
- for (ExecutionVertexID executionId : executionIds) {
- CompletableFuture<LogicalSlot>
logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
- SlotExecutionVertexAssignment assignment = new
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
- assignments.put(executionId, assignment);
- }
+ allocateLogicalSlotsFromSharedSlot(assignments,
executionIds, sharedSlot);
}
return assignments;
}
private SharedSlot getOrAllocateSharedSlot(
- ExecutionSlotSharingGroup executionSlotSharingGroup,
+ ExecutionSlotSharingGroup slotSharingGroup,
SharedSlotProfileRetriever sharedSlotProfileRetriever) {
- return sharedSlots
- .computeIfAbsent(executionSlotSharingGroup, group -> {
- SlotRequestId physicalSlotRequestId = new
SlotRequestId();
- ResourceProfile physicalSlotResourceProfile =
getPhysicalSlotResourceProfile(group);
- SlotProfile slotProfile =
sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
- PhysicalSlotRequest physicalSlotRequest =
- new
PhysicalSlotRequest(physicalSlotRequestId, slotProfile,
slotWillBeOccupiedIndefinitely);
- CompletableFuture<PhysicalSlot>
physicalSlotFuture = slotProvider
+ SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+ if (sharedSlot == null) {
+ SlotRequestId physicalSlotRequestId = new
SlotRequestId();
+ ResourceProfile physicalSlotResourceProfile =
+
getPhysicalSlotResourceProfile(slotSharingGroup);
+ SlotProfile slotProfile = sharedSlotProfileRetriever
+ .getSlotProfile(slotSharingGroup,
physicalSlotResourceProfile);
+ PhysicalSlotRequest physicalSlotRequest = new
PhysicalSlotRequest(
+ physicalSlotRequestId,
+ slotProfile,
+ slotWillBeOccupiedIndefinitely);
+ CompletableFuture<PhysicalSlot> physicalSlotFuture =
slotProvider
.allocatePhysicalSlot(physicalSlotRequest)
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
- return new SharedSlot(
+ sharedSlot = new SharedSlot(
physicalSlotRequestId,
physicalSlotResourceProfile,
- group,
+ slotSharingGroup,
physicalSlotFuture,
slotWillBeOccupiedIndefinitely,
this::releaseSharedSlot);
- });
+ if (!physicalSlotFuture.isCompletedExceptionally()) {
+ sharedSlots.put(slotSharingGroup, sharedSlot);
+ }
+ }
+ return sharedSlot;
+ }
+
+ private static void allocateLogicalSlotsFromSharedSlot(
+ Map<ExecutionVertexID, SlotExecutionVertexAssignment>
assignments,
+ Iterable<ExecutionVertexID> executionIds,
+ SharedSlot sharedSlot) {
+ for (ExecutionVertexID executionId : executionIds) {
+ boolean physicalSlotHasAlreadyFailed = sharedSlot
+ .getSlotContextFuture()
+ .isCompletedExceptionally();
+ CompletableFuture<LogicalSlot> logicalSlotFuture =
physicalSlotHasAlreadyFailed ?
+
sharedSlot.getSlotContextFuture().thenApply(stub -> null) :
+
sharedSlot.allocateLogicalSlot(executionId);
+ SlotExecutionVertexAssignment assignment =
+ new
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
+ assignments.put(executionId, assignment);
Review comment:
I am not a huge fan of returning values through a parameter. I think
returning the assignments can also work here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -213,20 +235,24 @@ private ResourceProfile
getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup
.reduce(ResourceProfile.ZERO, (r, e) ->
r.merge(resourceProfileRetriever.apply(e)), ResourceProfile::merge);
}
- private SharingPhysicalSlotRequestBulk
createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
- Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests
= executions
- .keySet()
- .stream()
- .collect(Collectors.toMap(
- group -> group,
- group ->
sharedSlots.get(group).getPhysicalSlotResourceProfile()
- ));
+ private Optional<SharingPhysicalSlotRequestBulk> createBulk(
+ Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>>
executions) {
+ Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests
= new HashMap<>();
+ for (ExecutionSlotSharingGroup group : executions.keySet()) {
+ SharedSlot sharedSlot = sharedSlots.get(group);
+ if (sharedSlot == null ||
sharedSlot.getSlotContextFuture().isCompletedExceptionally()) {
+ // there is no shared slot for this group or
its physical slot has already failed
+ // hence there is no point to track the whole
bulk
+ return Optional.empty();
+ }
Review comment:
Here it is a bit the same with the special case handling. What exactly
makes the cases 1) directly failed physical slot future and 2) failing just
after this method completes different?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID
executionVertexId, Throw
ExecutionSlotSharingGroup group = entry.getKey();
List<ExecutionVertexID> executionIds = entry.getValue();
SharedSlot sharedSlot = getOrAllocateSharedSlot(group,
sharedSlotProfileRetriever);
-
- for (ExecutionVertexID executionId : executionIds) {
- CompletableFuture<LogicalSlot>
logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
- SlotExecutionVertexAssignment assignment = new
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
- assignments.put(executionId, assignment);
- }
+ allocateLogicalSlotsFromSharedSlot(assignments,
executionIds, sharedSlot);
}
return assignments;
}
private SharedSlot getOrAllocateSharedSlot(
- ExecutionSlotSharingGroup executionSlotSharingGroup,
+ ExecutionSlotSharingGroup slotSharingGroup,
SharedSlotProfileRetriever sharedSlotProfileRetriever) {
- return sharedSlots
- .computeIfAbsent(executionSlotSharingGroup, group -> {
- SlotRequestId physicalSlotRequestId = new
SlotRequestId();
- ResourceProfile physicalSlotResourceProfile =
getPhysicalSlotResourceProfile(group);
- SlotProfile slotProfile =
sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
- PhysicalSlotRequest physicalSlotRequest =
- new
PhysicalSlotRequest(physicalSlotRequestId, slotProfile,
slotWillBeOccupiedIndefinitely);
- CompletableFuture<PhysicalSlot>
physicalSlotFuture = slotProvider
+ SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+ if (sharedSlot == null) {
+ SlotRequestId physicalSlotRequestId = new
SlotRequestId();
+ ResourceProfile physicalSlotResourceProfile =
+
getPhysicalSlotResourceProfile(slotSharingGroup);
+ SlotProfile slotProfile = sharedSlotProfileRetriever
+ .getSlotProfile(slotSharingGroup,
physicalSlotResourceProfile);
+ PhysicalSlotRequest physicalSlotRequest = new
PhysicalSlotRequest(
+ physicalSlotRequestId,
+ slotProfile,
+ slotWillBeOccupiedIndefinitely);
+ CompletableFuture<PhysicalSlot> physicalSlotFuture =
slotProvider
.allocatePhysicalSlot(physicalSlotRequest)
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
- return new SharedSlot(
+ sharedSlot = new SharedSlot(
physicalSlotRequestId,
physicalSlotResourceProfile,
- group,
+ slotSharingGroup,
physicalSlotFuture,
slotWillBeOccupiedIndefinitely,
this::releaseSharedSlot);
- });
+ if (!physicalSlotFuture.isCompletedExceptionally()) {
+ sharedSlots.put(slotSharingGroup, sharedSlot);
+ }
Review comment:
Hmm, this looks a bit like special case handling. I guess it would be
bit nicer if the case that it is completed exceptionally now and at a later
point would be handled the same way. What exactly is different from a
`physicalSlotFuture` being completed exceptionally right away and the
`phyiscalSlotFuture` being completed exceptionally just after the `SharedSlot`
has been created?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID
executionVertexId, Throw
ExecutionSlotSharingGroup group = entry.getKey();
List<ExecutionVertexID> executionIds = entry.getValue();
SharedSlot sharedSlot = getOrAllocateSharedSlot(group,
sharedSlotProfileRetriever);
-
- for (ExecutionVertexID executionId : executionIds) {
- CompletableFuture<LogicalSlot>
logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
- SlotExecutionVertexAssignment assignment = new
SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
- assignments.put(executionId, assignment);
- }
+ allocateLogicalSlotsFromSharedSlot(assignments,
executionIds, sharedSlot);
}
return assignments;
}
private SharedSlot getOrAllocateSharedSlot(
- ExecutionSlotSharingGroup executionSlotSharingGroup,
+ ExecutionSlotSharingGroup slotSharingGroup,
SharedSlotProfileRetriever sharedSlotProfileRetriever) {
- return sharedSlots
- .computeIfAbsent(executionSlotSharingGroup, group -> {
- SlotRequestId physicalSlotRequestId = new
SlotRequestId();
- ResourceProfile physicalSlotResourceProfile =
getPhysicalSlotResourceProfile(group);
- SlotProfile slotProfile =
sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
- PhysicalSlotRequest physicalSlotRequest =
- new
PhysicalSlotRequest(physicalSlotRequestId, slotProfile,
slotWillBeOccupiedIndefinitely);
- CompletableFuture<PhysicalSlot>
physicalSlotFuture = slotProvider
+ SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+ if (sharedSlot == null) {
+ SlotRequestId physicalSlotRequestId = new
SlotRequestId();
+ ResourceProfile physicalSlotResourceProfile =
+
getPhysicalSlotResourceProfile(slotSharingGroup);
+ SlotProfile slotProfile = sharedSlotProfileRetriever
+ .getSlotProfile(slotSharingGroup,
physicalSlotResourceProfile);
+ PhysicalSlotRequest physicalSlotRequest = new
PhysicalSlotRequest(
+ physicalSlotRequestId,
+ slotProfile,
+ slotWillBeOccupiedIndefinitely);
+ CompletableFuture<PhysicalSlot> physicalSlotFuture =
slotProvider
.allocatePhysicalSlot(physicalSlotRequest)
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
- return new SharedSlot(
+ sharedSlot = new SharedSlot(
physicalSlotRequestId,
physicalSlotResourceProfile,
- group,
+ slotSharingGroup,
physicalSlotFuture,
slotWillBeOccupiedIndefinitely,
this::releaseSharedSlot);
- });
+ if (!physicalSlotFuture.isCompletedExceptionally()) {
+ sharedSlots.put(slotSharingGroup, sharedSlot);
+ }
+ }
+ return sharedSlot;
+ }
+
+ private static void allocateLogicalSlotsFromSharedSlot(
+ Map<ExecutionVertexID, SlotExecutionVertexAssignment>
assignments,
+ Iterable<ExecutionVertexID> executionIds,
+ SharedSlot sharedSlot) {
+ for (ExecutionVertexID executionId : executionIds) {
+ boolean physicalSlotHasAlreadyFailed = sharedSlot
+ .getSlotContextFuture()
+ .isCompletedExceptionally();
+ CompletableFuture<LogicalSlot> logicalSlotFuture =
physicalSlotHasAlreadyFailed ?
+
sharedSlot.getSlotContextFuture().thenApply(stub -> null) :
+
sharedSlot.allocateLogicalSlot(executionId);
Review comment:
can't `SharedSlot.allocateLogicalSlot` return a failed future if the
internal future has already failed? That way, we wouldn't need the special
casing here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]