[
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16086915#comment-16086915
]
Sihua Zhou commented on FLINK-7153:
-----------------------------------
Hi [~StephanEwen],
Here is my solution.
For problem 1, We need to record the `Future<SimplSlot>` in `Execution` as
soon as `Execution.allocateSlotForExecution()` got a SimpleSlot successfully.
So i add a private field `Future<SimpleSlot> assignedFutureResource` with the
getter method for `Execution` class, the getter method will be called by
`ExecutionVertex.getPreferredLocationsBasedOnInputs()`. After that, problem 1
can be solved. This need to modify `Executioin.class` and
`ExecutionVertex.class`.
For problem 2, i think we need change the allocate strategy into two steps,
* Step 1, Only allocate from SlothGroup base on inputs, in this step every
ExectionVertex that can be allocate with local partition will be allocate
successfully.
* Step 2, Do normal allocation for the remain ExecutionVertex.
With the above two steps, problem 2 can be solved. This need to modify
`ExecutionJobVertex.class` , `Scheduler.class`, `SlotSharingGroupAssignment`
and `ScheduledUnit.class` which need to carry the
`onlyAllocateBasePreferLocation` flag.
After all, there are 7 classes need to be modified.
The bref code looks like this:
# *Execution.class*
{code:java}
class Execution {
private volatile Future<SimpleSlot> assignedFutureResource;
//the onlyAllocateBasePreferLocation parameter will be used for problem 2.
public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider,
boolean queued, boolean onlyAllocateBasePreferLocation)
throws IllegalExecutionStateException {
/*
*/
if (transitionState(CREATED, SCHEDULED)) {
ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, sharingGroup) :
new ScheduledUnit(this, sharingGroup,
locationConstraint);
toSchedule.setOnlyAllocateBasePreferInputs(onlyAllocateBasePreferLocation);
//record the assign info by update
assignedFutureResource
assignedFutureResource =
slotProvider.allocateSlot(toSchedule, queued);
if (assignedFutureResource == null) {
transitionState(SCHEDULED, CREATED);
}
return assignedFutureResource;
}
/*
*/
}
public Future<SimpleSlot> getAllocateFutureSlot() {
return assignedFutureResource;
}
}
{code}
# *ExecutionVertex.class*
{code:java}
class ExecutionVertex {
public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
/*
*/
//try to look-up futureSlot if getCurrentAssignedResource()
return null
SimpleSlot sourceSlot =
sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot == null) {
Future<SimpleSlot> futureSlot =
sources[k].getSource().getProducer().getCurrentExecutionAttempt().getAllocateFutureSlot();
if (futureSlot != null) {
sourceSlot = futureSlot.get();
}
}
/*
*/
}
}
{code}
# *ExecutionJobVertex.class*
{code:java}
class ExecutionJobVertex {
public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider
resourceProvider, boolean queued) {
final List<ExecutionAndSlot> slots = new
ArrayList<>(this.taskVertices.length);
//step 1
slots.addAll(allocateResourcesForAllHepler(resourceProvider,
queued, true));
//step 2
slots.addAll(allocateResourcesForAllHepler(resourceProvider,
queued, false));
return slots.toArray(new
ExecutionAndSlot[this.taskVertices.length]);
}
//In addition to adding an onlyAllocateBaseOnInputs parameter, it's
similar the old allocateResourcesForAll function.
public List<ExecutionAndSlot>
allocateResourcesForAllHepler(SlotProvider resourceProvider, boolean queued,
boolean onlyAllocateBaseOnInputs) {
final List<ExecutionAndSlot> slots = new
ArrayList<>(taskVertices.length);
final List<ExecutionVertex> verticeList =
Lists.newArrayList(this.taskVertices);
final SlotSharingGroup sharingGroup =
this.getSlotSharingGroup();
if (sharingGroup == null) {
if (onlyAllocateBaseOnInputs) {
return slots;
}
} else {
if (queued) {
throw new IllegalArgumentException(
"A task with a vertex sharing group was
scheduled in a queued fashion.");
}
}
for (int i = 0; i < verticeList.size(); ++i) {
boolean successful = false;
try {
final ExecutionVertex vertex =
verticeList.get(i);
final Execution exec =
vertex.getCurrentExecutionAttempt();
if (exec.hasPreAllocateSlot()) {
successful = true;
continue;
}
Future<SimpleSlot> future =
exec.allocateSlotForExecution(resourceProvider, queued,
onlyAllocateBaseOnInputs);
if (future != null) {
slots.add(new ExecutionAndSlot(exec,
future));
}
successful = true;
} finally {
if (!successful) {
// this is the case if an exception was
thrown
for (ExecutionAndSlot slot: slots) {
ExecutionGraphUtils.releaseSlotFuture(slot.slotFuture);
}
}
}
}
return slots;
}
}
{code}
# *Scheduler.class*
{code:java}
class Scheduler {
//In scheduleTask(), we need to return null, when try to allocate base
on inputs failed.
private Object scheduleTask(ScheduledUnit task, boolean
queueIfNoResource) throws NoResourceAvailableException {
synchronized (globalLock) {
.................
SlotSharingGroup sharingUnit =
task.getSlotSharingGroup();
if (sharingUnit != null) {
//
boolean isOnlyAllocateByPreferInputs =
task.getOnlyAllocateByPreferInputs();
if (isOnlyAllocateByPreferInputs &&
slotFromGroup == null) {
return null;
}
//
}
.......
}
}
}
{code}
# *SlotSharingGroupAssignment.class*
{code:java}
class SlotSharingGroupAssignment {
private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(AbstractID
groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly,
boolean onlyAllocateBaseOnInputs) {
/*
*/
if (preferredLocations != null) {
for (TaskManagerLocation location : preferredLocations)
{
// set the flag that we failed a preferred
location. If one will be found,
// we return early anyways and skip the flag
evaluation
didNotGetPreferred = true;
SharedSlot slot =
removeFromMultiMap(slotsForGroup, location.getResourceID());
if (slot != null && slot.isAlive()) {
return new Tuple2<>(slot,
Locality.LOCAL);
}
}
}
if (onlyAllocateBaseOnInputs) {
return null;
}
}
/*
*/
}
{code}
Thank you very much for your time to review my solution. Can you assign this
issue to me, i'm willing to contribute to flink so much.
Thanks,
Sihua ZHou
> Eager Scheduling can't allocate source for ExecutionGraph correctly
> -------------------------------------------------------------------
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
> Issue Type: Bug
> Components: JobManager
> Affects Versions: 1.3.1
> Reporter: Sihua Zhou
> Assignee: Stephan Ewen
> Priority: Blocker
> Fix For: 1.3.2
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been
> deployed via 'Execution.deployToSlot()'. So allocate resource base on
> prefered location can't work correctly, we need to set the slot info for
> `Execution` as soon as Execution.allocateSlotForExecution() called
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize. Here is the
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE,
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three
> remote partition. But actually, it should be 2 local partition and 2 remote
> partition.
> The causes of the above problems is becuase that the current allocate
> strategy is allocate the resource for execution one by one(if the execution
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it).
> If we change the allocate strategy to two step will solve this problem, below
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)