azagrebin commented on a change in pull request #13321:
URL: https://github.com/apache/flink/pull/13321#discussion_r484984687
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
##########
@@ -161,7 +161,7 @@ public TaskManagerLocation addTaskManager(int numberSlots) {
public void releaseTaskManager(ResourceID resourceId) {
try {
- supplyInMainThreadExecutor(() ->
slotPool.releaseTaskManager(resourceId, null));
+ supplyInMainThreadExecutor(() ->
slotPool.releaseTaskManager(resourceId, new Exception("Test Exception")));
Review comment:
Maybe `Releasing TaskManager in SlotPool for tests`?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
##########
@@ -114,38 +118,30 @@ public void
testCompletedExecutionVertexAssignmentWillBeUnregistered() {
public void testComputeAllPriorAllocationIds() {
final List<AllocationID> expectAllocationIds =
Arrays.asList(new AllocationID(), new AllocationID());
final List<ExecutionVertexSchedulingRequirements>
testSchedulingRequirements = Arrays.asList(
- new ExecutionVertexSchedulingRequirements.Builder().
- withExecutionVertexId(new ExecutionVertexID(new
JobVertexID(), 0)).
-
withPreviousAllocationId(expectAllocationIds.get(0)).
- build(),
- new ExecutionVertexSchedulingRequirements.Builder().
- withExecutionVertexId(new ExecutionVertexID(new
JobVertexID(), 1)).
-
withPreviousAllocationId(expectAllocationIds.get(0)).
- build(),
- new ExecutionVertexSchedulingRequirements.Builder().
- withExecutionVertexId(new ExecutionVertexID(new
JobVertexID(), 2)).
-
withPreviousAllocationId(expectAllocationIds.get(1)).
- build(),
- new ExecutionVertexSchedulingRequirements.Builder().
- withExecutionVertexId(new ExecutionVertexID(new
JobVertexID(), 3)).
- build()
+ createSchedulingRequirement(new ExecutionVertexID(new
JobVertexID(), 0), expectAllocationIds.get(0)),
+ createSchedulingRequirement(new ExecutionVertexID(new
JobVertexID(), 1), expectAllocationIds.get(0)),
+ createSchedulingRequirement(new ExecutionVertexID(new
JobVertexID(), 2), expectAllocationIds.get(1)),
+ createSchedulingRequirement(new ExecutionVertexID(new
JobVertexID(), 3))
);
final Set<AllocationID> allPriorAllocationIds =
AbstractExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
assertThat(allPriorAllocationIds,
containsInAnyOrder(expectAllocationIds.toArray()));
}
- private List<ExecutionVertexSchedulingRequirements>
createSchedulingRequirements(
- final ExecutionVertexID... executionVertexIds) {
-
- final List<ExecutionVertexSchedulingRequirements>
schedulingRequirements = new ArrayList<>(executionVertexIds.length);
+ private ExecutionVertexSchedulingRequirements
createSchedulingRequirement(
+ final ExecutionVertexID executionVertexId) {
+ return createSchedulingRequirement(executionVertexId, null);
+ }
Review comment:
```suggestion
private ExecutionVertexSchedulingRequirements
createSchedulingRequirement(
final int subtaskIndex) {
return createSchedulingRequirement(new ExecutionVertexID(new
JobVertexID(), subtaskIndex), null);
}
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
##########
@@ -364,25 +365,29 @@ public void
addOperatorCoordinator(SerializedValue<OperatorCoordinator.Provider>
* @param grp The slot sharing group to associate the vertex with.
*/
public void setSlotSharingGroup(SlotSharingGroup grp) {
+ checkNotNull(grp);
+
if (this.slotSharingGroup != null) {
this.slotSharingGroup.removeVertexFromGroup(this.getID(),
this.getMinResources());
}
+ grp.addVertexToGroup(this.getID(), this.getMinResources());
this.slotSharingGroup = grp;
- if (grp != null) {
- grp.addVertexToGroup(this.getID(),
this.getMinResources());
- }
}
/**
* Gets the slot sharing group that this vertex is associated with.
Different vertices in the same
- * slot sharing group can run one subtask each in the same slot. If the
vertex is not associated with
- * a slot sharing group, this method returns {@code null}.
+ * slot sharing group can run one subtask each in the same slot.
*
- * @return The slot sharing group to associate the vertex with, or
{@code null}, if not associated with one.
+ * @return The slot sharing group to associate the vertex with
*/
- @Nullable
public SlotSharingGroup getSlotSharingGroup() {
+ if (slotSharingGroup == null) {
+ // create a new slot sharing group for this vertex if
it was in no other slot sharing group.
+ // this should only happen in testing cases at the
moment because production code path will
+ // always set a value to it before used
Review comment:
is it a lot of effort to fix this in tests so that we can remove this
check?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java
##########
@@ -224,6 +224,7 @@ public void testCoLocationConstraintThrowsException() {
final List<ExecutionVertexSchedulingRequirements>
schedulingRequirements = Collections.singletonList(
new ExecutionVertexSchedulingRequirements.Builder()
.withExecutionVertexId(new
ExecutionVertexID(new JobVertexID(), 0))
+ .withSlotSharingGroupId(new
SlotSharingGroupId())
Review comment:
why not to add `new SlotSharingGroupId()` as a default value of
`slotSharingGroupId` in `ExecutionVertexSchedulingRequirements.Builder`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]