azagrebin commented on a change in pull request #13321:
URL: https://github.com/apache/flink/pull/13321#discussion_r484984687



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
##########
@@ -161,7 +161,7 @@ public TaskManagerLocation addTaskManager(int numberSlots) {
 
                public void releaseTaskManager(ResourceID resourceId) {
                        try {
-                               supplyInMainThreadExecutor(() -> 
slotPool.releaseTaskManager(resourceId, null));
+                               supplyInMainThreadExecutor(() -> 
slotPool.releaseTaskManager(resourceId, new Exception("Test Exception")));

Review comment:
       Maybe `Releasing TaskManager in SlotPool for tests`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
##########
@@ -114,38 +118,30 @@ public void 
testCompletedExecutionVertexAssignmentWillBeUnregistered() {
        public void testComputeAllPriorAllocationIds() {
                final List<AllocationID> expectAllocationIds = 
Arrays.asList(new AllocationID(), new AllocationID());
                final List<ExecutionVertexSchedulingRequirements> 
testSchedulingRequirements = Arrays.asList(
-                       new ExecutionVertexSchedulingRequirements.Builder().
-                               withExecutionVertexId(new ExecutionVertexID(new 
JobVertexID(), 0)).
-                               
withPreviousAllocationId(expectAllocationIds.get(0)).
-                               build(),
-                       new ExecutionVertexSchedulingRequirements.Builder().
-                               withExecutionVertexId(new ExecutionVertexID(new 
JobVertexID(), 1)).
-                               
withPreviousAllocationId(expectAllocationIds.get(0)).
-                               build(),
-                       new ExecutionVertexSchedulingRequirements.Builder().
-                               withExecutionVertexId(new ExecutionVertexID(new 
JobVertexID(), 2)).
-                               
withPreviousAllocationId(expectAllocationIds.get(1)).
-                               build(),
-                       new ExecutionVertexSchedulingRequirements.Builder().
-                               withExecutionVertexId(new ExecutionVertexID(new 
JobVertexID(), 3)).
-                               build()
+                       createSchedulingRequirement(new ExecutionVertexID(new 
JobVertexID(), 0), expectAllocationIds.get(0)),
+                       createSchedulingRequirement(new ExecutionVertexID(new 
JobVertexID(), 1), expectAllocationIds.get(0)),
+                       createSchedulingRequirement(new ExecutionVertexID(new 
JobVertexID(), 2), expectAllocationIds.get(1)),
+                       createSchedulingRequirement(new ExecutionVertexID(new 
JobVertexID(), 3))
                );
 
                final Set<AllocationID> allPriorAllocationIds =
                        
AbstractExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
                assertThat(allPriorAllocationIds, 
containsInAnyOrder(expectAllocationIds.toArray()));
        }
 
-       private List<ExecutionVertexSchedulingRequirements> 
createSchedulingRequirements(
-                       final ExecutionVertexID... executionVertexIds) {
-
-               final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements = new ArrayList<>(executionVertexIds.length);
+       private ExecutionVertexSchedulingRequirements 
createSchedulingRequirement(
+                       final ExecutionVertexID executionVertexId) {
+               return createSchedulingRequirement(executionVertexId, null);
+       }

Review comment:
       ```suggestion
        private ExecutionVertexSchedulingRequirements 
createSchedulingRequirement(
                        final int subtaskIndex) {
                return createSchedulingRequirement(new ExecutionVertexID(new 
JobVertexID(), subtaskIndex), null);
        }
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
##########
@@ -364,25 +365,29 @@ public void 
addOperatorCoordinator(SerializedValue<OperatorCoordinator.Provider>
         * @param grp The slot sharing group to associate the vertex with.
         */
        public void setSlotSharingGroup(SlotSharingGroup grp) {
+               checkNotNull(grp);
+
                if (this.slotSharingGroup != null) {
                        
this.slotSharingGroup.removeVertexFromGroup(this.getID(), 
this.getMinResources());
                }
 
+               grp.addVertexToGroup(this.getID(), this.getMinResources());
                this.slotSharingGroup = grp;
-               if (grp != null) {
-                       grp.addVertexToGroup(this.getID(), 
this.getMinResources());
-               }
        }
 
        /**
         * Gets the slot sharing group that this vertex is associated with. 
Different vertices in the same
-        * slot sharing group can run one subtask each in the same slot. If the 
vertex is not associated with
-        * a slot sharing group, this method returns {@code null}.
+        * slot sharing group can run one subtask each in the same slot.
         *
-        * @return The slot sharing group to associate the vertex with, or 
{@code null}, if not associated with one.
+        * @return The slot sharing group to associate the vertex with
         */
-       @Nullable
        public SlotSharingGroup getSlotSharingGroup() {
+               if (slotSharingGroup == null) {
+                       // create a new slot sharing group for this vertex if 
it was in no other slot sharing group.
+                       // this should only happen in testing cases at the 
moment because production code path will
+                       // always set a value to it before used

Review comment:
       is it a lot of effort to fix this in tests so that we can remove this 
check?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java
##########
@@ -224,6 +224,7 @@ public void testCoLocationConstraintThrowsException() {
                final List<ExecutionVertexSchedulingRequirements> 
schedulingRequirements = Collections.singletonList(
                        new ExecutionVertexSchedulingRequirements.Builder()
                                .withExecutionVertexId(new 
ExecutionVertexID(new JobVertexID(), 0))
+                               .withSlotSharingGroupId(new 
SlotSharingGroupId())

Review comment:
       why not to add `new SlotSharingGroupId()` as a default value of 
`slotSharingGroupId` in `ExecutionVertexSchedulingRequirements.Builder`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to