seojangho commented on a change in pull request #15: [NEMO-52] Make
SchedulingPolicy Stackable
URL: https://github.com/apache/incubator-nemo/pull/15#discussion_r187789016
##########
File path:
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
##########
@@ -42,130 +37,44 @@
public final class SourceLocationAwareSchedulingPolicy implements
SchedulingPolicy {
private static final Logger LOG =
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
- private final ExecutorRegistry executorRegistry;
- private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
-
- /**
- * Injectable constructor for {@link SourceLocationAwareSchedulingPolicy}.
- * @param executorRegistry provides catalog of available executors
- * @param roundRobinSchedulingPolicy provides fallback for TaskGroups with
no input source information
- */
@Inject
- private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry
executorRegistry,
- final RoundRobinSchedulingPolicy
roundRobinSchedulingPolicy) {
- this.executorRegistry = executorRegistry;
- this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+ public SourceLocationAwareSchedulingPolicy() {
}
/**
- * Try to schedule a TaskGroup.
- * If the TaskGroup has one or more source tasks, this method schedules the
task group to one of the physical nodes,
- * chosen from union of set of locations where splits of each source task
resides.
- * If the TaskGroup has no source tasks, falls back to {@link
RoundRobinSchedulingPolicy}.
- * @param scheduledTaskGroup to schedule.
- * @param jobStateManager jobStateManager which the TaskGroup belongs to.
- * @return true if the task group is successfully scheduled, false otherwise.
+ * @param readables collection of readables
+ * @return Set of source locations from source tasks in {@code taskGroupDAG}
+ * @throws Exception for any exception raised during querying source
locations for a readable
*/
+ private static Set<String> getSourceLocations(final Collection<Readable>
readables) throws Exception {
+ final List<String> sourceLocations = new ArrayList<>();
+ for (final Readable readable : readables) {
+ sourceLocations.addAll(readable.getLocations());
+ }
+ return new HashSet<>(sourceLocations);
+ }
+
@Override
- public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
- final JobStateManager jobStateManager) {
- Set<String> sourceLocations = Collections.emptySet();
+ public Set<ExecutorRepresenter> filterExecutorRepresenters(final
Set<ExecutorRepresenter> executorRepresenterList,
+ final
ScheduledTaskGroup scheduledTaskGroup) {
Review comment:
Alighment?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services