[
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290434#comment-16290434
]
ASF GitHub Bot commented on FLINK-7956:
---------------------------------------
Github user ifndef-SleePy commented on a diff in the pull request:
https://github.com/apache/flink/pull/5091#discussion_r154871284
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
---
@@ -19,68 +19,104 @@
package org.apache.flink.runtime.jobmanager.scheduler;
import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
public class ScheduledUnit {
-
+
+ @Nullable
private final Execution vertexExecution;
-
- private final SlotSharingGroup sharingGroup;
-
- private final CoLocationConstraint locationConstraint;
+
+ private final JobVertexID jobVertexId;
+
+ @Nullable
+ private final SlotSharingGroupId slotSharingGroupId;
+
+ @Nullable
+ private final CoLocationConstraint coLocationConstraint;
//
--------------------------------------------------------------------------------------------
public ScheduledUnit(Execution task) {
- Preconditions.checkNotNull(task);
-
- this.vertexExecution = task;
- this.sharingGroup = null;
- this.locationConstraint = null;
+ this(
+ Preconditions.checkNotNull(task),
+ task.getVertex().getJobvertexId(),
+ null,
+ null);
}
- public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) {
- Preconditions.checkNotNull(task);
-
- this.vertexExecution = task;
- this.sharingGroup = sharingUnit;
- this.locationConstraint = null;
+ public ScheduledUnit(Execution task, SlotSharingGroupId
slotSharingGroupId) {
+ this(
+ Preconditions.checkNotNull(task),
+ task.getVertex().getJobvertexId(),
+ slotSharingGroupId,
+ null);
}
- public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit,
CoLocationConstraint locationConstraint) {
- Preconditions.checkNotNull(task);
- Preconditions.checkNotNull(sharingUnit);
- Preconditions.checkNotNull(locationConstraint);
-
+ public ScheduledUnit(
+ Execution task,
+ SlotSharingGroupId slotSharingGroupId,
+ CoLocationConstraint coLocationConstraint) {
+ this(
+ Preconditions.checkNotNull(task),
+ task.getVertex().getJobvertexId(),
+ slotSharingGroupId,
+ coLocationConstraint);
+ }
+
+ public ScheduledUnit(
+ JobVertexID jobVertexId,
+ SlotSharingGroupId slotSharingGroupId,
+ CoLocationConstraint coLocationConstraint) {
+ this(
+ null,
+ jobVertexId,
+ slotSharingGroupId,
+ coLocationConstraint);
+ }
+
+ public ScheduledUnit(
+ Execution task,
+ JobVertexID jobVertexId,
--- End diff --
We can get JobVertexID from Execution. Do we need this in Constructor?
> Add support for scheduling with slot sharing
> --------------------------------------------
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
> Issue Type: Sub-task
> Components: Scheduler
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add
> support for scheduling with slot sharing to the {{SlotPool}}. This will also
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the
> Flip-6 {{MiniCluster}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)