[
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16086915#comment-16086915
]
Sihua Zhou edited comment on FLINK-7153 at 7/14/17 6:08 AM:
------------------------------------------------------------
Hi [~StephanEwen],
Here is my solution.
For problem 1, We need to record the `Future<SimplSlot>` in `Execution` as
soon as `Execution.allocateSlotForExecution()` got a SimpleSlot successfully.
So i add a private field `Future<SimpleSlot> assignedFutureResource` with the
getter method for `Execution` class, the getter method will be called by
`ExecutionVertex.getPreferredLocationsBasedOnInputs()`. After that, problem 1
can be solved. This need to modify `Executioin.class` and
`ExecutionVertex.class`.
For problem 2, i think we need change the allocate strategy into two steps,
* Step 1, Only allocate from SlothGroup base on inputs, in this step every
ExectionVertex that can be allocate with local partition will be allocate
successfully.
* Step 2, Do normal allocation for the remain ExecutionVertex.
With the above two steps, problem 2 can be solved. This need to modify
`ExecutionJobVertex.class` , `Scheduler.class`, `SlotSharingGroupAssignment`
and `ScheduledUnit.class` which need to carry the
`onlyAllocateBasePreferLocation` flag.
After all, there are 7 classes need to be modified.
The bref code looks like this:
# *Execution.class*
{code:java}
class Execution {
private volatile Future<SimpleSlot> assignedFutureResource;
//the onlyAllocateBasePreferLocation parameter will be used for problem 2.
public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider,
boolean queued, boolean onlyAllocateBasePreferLocation)
throws IllegalExecutionStateException {
/*
*/
if (transitionState(CREATED, SCHEDULED)) {
ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, sharingGroup) :
new ScheduledUnit(this, sharingGroup,
locationConstraint);
toSchedule.setOnlyAllocateBasePreferInputs(onlyAllocateBasePreferLocation);
//record the assign info by update
assignedFutureResource
assignedFutureResource =
slotProvider.allocateSlot(toSchedule, queued);
if (assignedFutureResource == null) {
transitionState(SCHEDULED, CREATED);
}
return assignedFutureResource;
}
/*
*/
}
public Future<SimpleSlot> getAllocateFutureSlot() {
return assignedFutureResource;
}
}
{code}
# *ExecutionVertex.class*
{code:java}
class ExecutionVertex {
public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
/*
*/
//try to look-up futureSlot if getCurrentAssignedResource()
return null
SimpleSlot sourceSlot =
sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot == null) {
Future<SimpleSlot> futureSlot =
sources[k].getSource().getProducer().getCurrentExecutionAttempt().getAllocateFutureSlot();
if (futureSlot != null) {
sourceSlot = futureSlot.get();
}
}
/*
*/
}
}
{code}
# *ExecutionJobVertex.class*
{code:java}
class ExecutionJobVertex {
public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider
resourceProvider, boolean queued) {
final List<ExecutionAndSlot> slots = new
ArrayList<>(this.taskVertices.length);
//step 1
slots.addAll(allocateResourcesForAllHepler(resourceProvider,
queued, true));
//step 2
slots.addAll(allocateResourcesForAllHepler(resourceProvider,
queued, false));
return slots.toArray(new
ExecutionAndSlot[this.taskVertices.length]);
}
//In addition to adding an onlyAllocateBaseOnInputs parameter, it's
similar the old allocateResourcesForAll function.
public List<ExecutionAndSlot>
allocateResourcesForAllHepler(SlotProvider resourceProvider, boolean queued,
boolean onlyAllocateBaseOnInputs) {
final List<ExecutionAndSlot> slots = new
ArrayList<>(taskVertices.length);
final List<ExecutionVertex> verticeList =
Lists.newArrayList(this.taskVertices);
final SlotSharingGroup sharingGroup =
this.getSlotSharingGroup();
if (sharingGroup == null) {
if (onlyAllocateBaseOnInputs) {
return slots;
}
} else {
if (queued) {
throw new IllegalArgumentException(
"A task with a vertex sharing group was
scheduled in a queued fashion.");
}
}
for (int i = 0; i < verticeList.size(); ++i) {
boolean successful = false;
try {
final ExecutionVertex vertex =
verticeList.get(i);
final Execution exec =
vertex.getCurrentExecutionAttempt();
if (exec.hasPreAllocateSlot()) {
successful = true;
continue;
}
Future<SimpleSlot> future =
exec.allocateSlotForExecution(resourceProvider, queued,
onlyAllocateBaseOnInputs);
if (future != null) {
slots.add(new ExecutionAndSlot(exec,
future));
}
successful = true;
} finally {
if (!successful) {
// this is the case if an exception was
thrown
for (ExecutionAndSlot slot: slots) {
ExecutionGraphUtils.releaseSlotFuture(slot.slotFuture);
}
}
}
}
return slots;
}
}
{code}
# *Scheduler.class*
{code:java}
class Scheduler {
//In scheduleTask(), we need to return null, when try to allocate base
on inputs failed.
private Object scheduleTask(ScheduledUnit task, boolean
queueIfNoResource) throws NoResourceAvailableException {
synchronized (globalLock) {
.................
SlotSharingGroup sharingUnit =
task.getSlotSharingGroup();
if (sharingUnit != null) {
//
boolean isOnlyAllocateByPreferInputs =
task.getOnlyAllocateByPreferInputs();
if (isOnlyAllocateByPreferInputs &&
slotFromGroup == null) {
return null;
}
//
}
.......
}
}
}
{code}
# *SlotSharingGroupAssignment.class*
{code:java}
class SlotSharingGroupAssignment {
private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(AbstractID
groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly,
boolean onlyAllocateBaseOnInputs) {
/*
*/
if (preferredLocations != null) {
for (TaskManagerLocation location : preferredLocations) {
// set the flag that we failed a preferred
location. If one will be found,
// we return early anyways and skip the flag
evaluation
didNotGetPreferred = true;
SharedSlot slot =
removeFromMultiMap(slotsForGroup, location.getResourceID());
if (slot != null && slot.isAlive()) {
return new Tuple2<>(slot,
Locality.LOCAL);
}
}
}
if (onlyAllocateBaseOnInputs) {
return null;
}
}
/*
*/
}
{code}
Thank you very much for your time to review my solution. Can you assign this
issue to me, i'm willing to contribute to flink so much.
Thanks,
Sihua ZHou
was (Author: sihuazhou):
Hi [~StephanEwen],
Here is my solution.
For problem 1, We need to record the `Future<SimplSlot>` in `Execution` as
soon as `Execution.allocateSlotForExecution()` got a SimpleSlot successfully.
So i add a private field `Future<SimpleSlot> assignedFutureResource` with the
getter method for `Execution` class, the getter method will be called by
`ExecutionVertex.getPreferredLocationsBasedOnInputs()`. After that, problem 1
can be solved. This need to modify `Executioin.class` and
`ExecutionVertex.class`.
For problem 2, i think we need change the allocate strategy into two steps,
* Step 1, Only allocate from SlothGroup base on inputs, in this step every
ExectionVertex that can be allocate with local partition will be allocate
successfully.
* Step 2, Do normal allocation for the remain ExecutionVertex.
With the above two steps, problem 2 can be solved. This need to modify
`ExecutionJobVertex.class` , `Scheduler.class`, `SlotSharingGroupAssignment`
and `ScheduledUnit.class` which need to carry the
`onlyAllocateBasePreferLocation` flag.
After all, there are 7 classes need to be modified.
The bref code looks like this:
# *Execution.class*
{code:java}
class Execution {
private volatile Future<SimpleSlot> assignedFutureResource;
//the onlyAllocateBasePreferLocation parameter will be used for problem 2.
public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider,
boolean queued, boolean onlyAllocateBasePreferLocation)
throws IllegalExecutionStateException {
/*
*/
if (transitionState(CREATED, SCHEDULED)) {
ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, sharingGroup) :
new ScheduledUnit(this, sharingGroup,
locationConstraint);
toSchedule.setOnlyAllocateBasePreferInputs(onlyAllocateBasePreferLocation);
//record the assign info by update
assignedFutureResource
assignedFutureResource =
slotProvider.allocateSlot(toSchedule, queued);
if (assignedFutureResource == null) {
transitionState(SCHEDULED, CREATED);
}
return assignedFutureResource;
}
/*
*/
}
public Future<SimpleSlot> getAllocateFutureSlot() {
return assignedFutureResource;
}
}
{code}
# *ExecutionVertex.class*
{code:java}
class ExecutionVertex {
public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
/*
*/
//try to look-up futureSlot if getCurrentAssignedResource()
return null
SimpleSlot sourceSlot =
sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot == null) {
Future<SimpleSlot> futureSlot =
sources[k].getSource().getProducer().getCurrentExecutionAttempt().getAllocateFutureSlot();
if (futureSlot != null) {
sourceSlot = futureSlot.get();
}
}
/*
*/
}
}
{code}
# *ExecutionJobVertex.class*
{code:java}
class ExecutionJobVertex {
public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider
resourceProvider, boolean queued) {
final List<ExecutionAndSlot> slots = new
ArrayList<>(this.taskVertices.length);
//step 1
slots.addAll(allocateResourcesForAllHepler(resourceProvider,
queued, true));
//step 2
slots.addAll(allocateResourcesForAllHepler(resourceProvider,
queued, false));
return slots.toArray(new
ExecutionAndSlot[this.taskVertices.length]);
}
//In addition to adding an onlyAllocateBaseOnInputs parameter, it's
similar the old allocateResourcesForAll function.
public List<ExecutionAndSlot>
allocateResourcesForAllHepler(SlotProvider resourceProvider, boolean queued,
boolean onlyAllocateBaseOnInputs) {
final List<ExecutionAndSlot> slots = new
ArrayList<>(taskVertices.length);
final List<ExecutionVertex> verticeList =
Lists.newArrayList(this.taskVertices);
final SlotSharingGroup sharingGroup =
this.getSlotSharingGroup();
if (sharingGroup == null) {
if (onlyAllocateBaseOnInputs) {
return slots;
}
} else {
if (queued) {
throw new IllegalArgumentException(
"A task with a vertex sharing group was
scheduled in a queued fashion.");
}
}
for (int i = 0; i < verticeList.size(); ++i) {
boolean successful = false;
try {
final ExecutionVertex vertex =
verticeList.get(i);
final Execution exec =
vertex.getCurrentExecutionAttempt();
if (exec.hasPreAllocateSlot()) {
successful = true;
continue;
}
Future<SimpleSlot> future =
exec.allocateSlotForExecution(resourceProvider, queued,
onlyAllocateBaseOnInputs);
if (future != null) {
slots.add(new ExecutionAndSlot(exec,
future));
}
successful = true;
} finally {
if (!successful) {
// this is the case if an exception was
thrown
for (ExecutionAndSlot slot: slots) {
ExecutionGraphUtils.releaseSlotFuture(slot.slotFuture);
}
}
}
}
return slots;
}
}
{code}
# *Scheduler.class*
{code:java}
class Scheduler {
//In scheduleTask(), we need to return null, when try to allocate base
on inputs failed.
private Object scheduleTask(ScheduledUnit task, boolean
queueIfNoResource) throws NoResourceAvailableException {
synchronized (globalLock) {
.................
SlotSharingGroup sharingUnit =
task.getSlotSharingGroup();
if (sharingUnit != null) {
//
boolean isOnlyAllocateByPreferInputs =
task.getOnlyAllocateByPreferInputs();
if (isOnlyAllocateByPreferInputs &&
slotFromGroup == null) {
return null;
}
//
}
.......
}
}
}
{code}
# *SlotSharingGroupAssignment.class*
{code:java}
class SlotSharingGroupAssignment {
private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(AbstractID
groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly,
boolean onlyAllocateBaseOnInputs) {
/*
*/
if (preferredLocations != null) {
for (TaskManagerLocation location : preferredLocations)
{
// set the flag that we failed a preferred
location. If one will be found,
// we return early anyways and skip the flag
evaluation
didNotGetPreferred = true;
SharedSlot slot =
removeFromMultiMap(slotsForGroup, location.getResourceID());
if (slot != null && slot.isAlive()) {
return new Tuple2<>(slot,
Locality.LOCAL);
}
}
}
if (onlyAllocateBaseOnInputs) {
return null;
}
}
/*
*/
}
{code}
Thank you very much for your time to review my solution. Can you assign this
issue to me, i'm willing to contribute to flink so much.
Thanks,
Sihua ZHou
> Eager Scheduling can't allocate source for ExecutionGraph correctly
> -------------------------------------------------------------------
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
> Issue Type: Bug
> Components: JobManager
> Affects Versions: 1.3.1
> Reporter: Sihua Zhou
> Assignee: Stephan Ewen
> Priority: Blocker
> Fix For: 1.3.2
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been
> deployed via 'Execution.deployToSlot()'. So allocate resource base on
> prefered location can't work correctly, we need to set the slot info for
> `Execution` as soon as Execution.allocateSlotForExecution() called
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize. Here is the
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE,
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three
> remote partition. But actually, it should be 2 local partition and 2 remote
> partition.
> The causes of the above problems is becuase that the current allocate
> strategy is allocate the resource for execution one by one(if the execution
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it).
> If we change the allocate strategy to two step will solve this problem, below
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)