[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16086915#comment-16086915
 ] 

Sihua Zhou edited comment on FLINK-7153 at 7/14/17 6:08 AM:
------------------------------------------------------------

Hi [~StephanEwen],

Here is my solution.

For  problem 1, We need to record the `Future<SimplSlot>` in `Execution` as 
soon as  `Execution.allocateSlotForExecution()` got a SimpleSlot successfully. 
So i add a private field `Future<SimpleSlot> assignedFutureResource` with the 
getter method for `Execution` class, the getter method will be called by 
`ExecutionVertex.getPreferredLocationsBasedOnInputs()`. After that, problem 1 
can be solved. This need to modify `Executioin.class` and 
`ExecutionVertex.class`.

For problem 2, i think we need change the allocate strategy into two steps,
*  Step 1, Only allocate from SlothGroup base on inputs, in this step every 
ExectionVertex that can be allocate with local partition will be allocate 
successfully.  
*  Step 2, Do normal allocation for the remain ExecutionVertex.
With the above two steps, problem 2 can be solved. This need to modify 
`ExecutionJobVertex.class` , `Scheduler.class`, `SlotSharingGroupAssignment` 
and `ScheduledUnit.class` which need to carry the 
`onlyAllocateBasePreferLocation` flag. 

After all, there are 7 classes need to be modified.

The bref code looks like this:

# *Execution.class*
{code:java}
class Execution {

  private volatile Future<SimpleSlot> assignedFutureResource; 
  //the onlyAllocateBasePreferLocation parameter will be used for problem 2.
  public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, 
boolean queued, boolean onlyAllocateBasePreferLocation)
                        throws IllegalExecutionStateException {
                /*
                */
                if (transitionState(CREATED, SCHEDULED)) {

                        ScheduledUnit toSchedule = locationConstraint == null ?
                                        new ScheduledUnit(this, sharingGroup) :
                                        new ScheduledUnit(this, sharingGroup, 
locationConstraint);
                                        
                        
toSchedule.setOnlyAllocateBasePreferInputs(onlyAllocateBasePreferLocation);

                        //record the assign info by update 
assignedFutureResource
                        assignedFutureResource = 
slotProvider.allocateSlot(toSchedule, queued);
                        if (assignedFutureResource == null) {
                                transitionState(SCHEDULED, CREATED);
                        }
                        return assignedFutureResource;
                }
                /*
                */
        }
        
        public Future<SimpleSlot> getAllocateFutureSlot() {
      return assignedFutureResource;
        }
}
{code}


# *ExecutionVertex.class*
{code:java}
class ExecutionVertex {
  public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
                /*
                */
                //try to look-up futureSlot if getCurrentAssignedResource() 
return null
                SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
                                                
                if (sourceSlot == null) {
                                Future<SimpleSlot> futureSlot = 
sources[k].getSource().getProducer().getCurrentExecutionAttempt().getAllocateFutureSlot();
                                if (futureSlot != null) {
                                        sourceSlot = futureSlot.get();
                                }
                }
                /*
                */
        }
}
{code}

# *ExecutionJobVertex.class*
{code:java}
class ExecutionJobVertex {
        public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider 
resourceProvider, boolean queued) {
                final List<ExecutionAndSlot> slots = new 
ArrayList<>(this.taskVertices.length);
                //step 1
                slots.addAll(allocateResourcesForAllHepler(resourceProvider, 
queued, true));
                //step 2
                slots.addAll(allocateResourcesForAllHepler(resourceProvider, 
queued, false));
                return slots.toArray(new 
ExecutionAndSlot[this.taskVertices.length]);
        }
        
        
        //In addition to adding an onlyAllocateBaseOnInputs parameter, it's 
similar the old allocateResourcesForAll function.
        public List<ExecutionAndSlot> 
allocateResourcesForAllHepler(SlotProvider resourceProvider, boolean queued, 
boolean onlyAllocateBaseOnInputs) {

                final List<ExecutionAndSlot> slots = new 
ArrayList<>(taskVertices.length);
                final List<ExecutionVertex> verticeList = 
Lists.newArrayList(this.taskVertices);

                final SlotSharingGroup sharingGroup = 
this.getSlotSharingGroup();

                if (sharingGroup == null) {
                        if (onlyAllocateBaseOnInputs) {
                                return slots;
                        }
                } else {
                        if (queued) {
                                throw new IllegalArgumentException(
                                        "A task with a vertex sharing group was 
scheduled in a queued fashion.");
                        }
                }

                for (int i = 0; i < verticeList.size(); ++i) {
                        boolean successful = false;
                        try {
                                final ExecutionVertex vertex = 
verticeList.get(i);
                                final Execution exec = 
vertex.getCurrentExecutionAttempt();
                                if (exec.hasPreAllocateSlot()) {
                                        successful = true;
                                        continue;
                                }
                                Future<SimpleSlot> future = 
exec.allocateSlotForExecution(resourceProvider, queued, 
onlyAllocateBaseOnInputs);
                                if (future != null) {
                                        slots.add(new ExecutionAndSlot(exec, 
future));
                                }
                                successful = true;
                        } finally {
                                if (!successful) {
                                        // this is the case if an exception was 
thrown
                                        for (ExecutionAndSlot slot: slots) {
                                                
ExecutionGraphUtils.releaseSlotFuture(slot.slotFuture);
                                        }
                                }
                        }
                }

                return slots;
        }       
}
{code}

# *Scheduler.class*
{code:java}
class Scheduler {       
        //In scheduleTask(), we need to return null, when try to allocate base 
on inputs failed.
        private Object scheduleTask(ScheduledUnit task, boolean 
queueIfNoResource) throws NoResourceAvailableException {

                synchronized (globalLock) {
                        .................
                        SlotSharingGroup sharingUnit = 
task.getSlotSharingGroup();                      
                        if (sharingUnit != null) {
                                //
                                boolean isOnlyAllocateByPreferInputs = 
task.getOnlyAllocateByPreferInputs();
                                if (isOnlyAllocateByPreferInputs && 
slotFromGroup == null) {
                                        return null;
                                }
                                //
                        }
                        .......
                }
        }
}
{code}

# *SlotSharingGroupAssignment.class*
{code:java}
class SlotSharingGroupAssignment {
  
  private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(AbstractID 
groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly, 
boolean onlyAllocateBaseOnInputs) {      
  /*
  */
  if (preferredLocations != null) {
                for (TaskManagerLocation location : preferredLocations) {
                                // set the flag that we failed a preferred 
location. If one will be found,
                                // we return early anyways and skip the flag 
evaluation
                                didNotGetPreferred = true;
                                SharedSlot slot = 
removeFromMultiMap(slotsForGroup, location.getResourceID());
                                if (slot != null && slot.isAlive()) {
                                        return new Tuple2<>(slot, 
Locality.LOCAL);
                                }
                        }
                }

                if (onlyAllocateBaseOnInputs) {
                        return null;
                }    
  }
  /*
  */
}
{code}

Thank you very much for your time to review my solution. Can you assign this 
issue to me, i'm willing to contribute to flink so much.

Thanks,
Sihua ZHou


was (Author: sihuazhou):
Hi [~StephanEwen],

Here is my solution.

For  problem 1, We need to record the `Future<SimplSlot>` in `Execution` as 
soon as  `Execution.allocateSlotForExecution()` got a SimpleSlot successfully. 
So i add a private field `Future<SimpleSlot> assignedFutureResource` with the 
getter method for `Execution` class, the getter method will be called by 
`ExecutionVertex.getPreferredLocationsBasedOnInputs()`. After that, problem 1 
can be solved. This need to modify `Executioin.class` and 
`ExecutionVertex.class`.

For problem 2, i think we need change the allocate strategy into two steps,
*  Step 1, Only allocate from SlothGroup base on inputs, in this step every 
ExectionVertex that can be allocate with local partition will be allocate 
successfully.  
*  Step 2, Do normal allocation for the remain ExecutionVertex.
With the above two steps, problem 2 can be solved. This need to modify 
`ExecutionJobVertex.class` , `Scheduler.class`, `SlotSharingGroupAssignment` 
and `ScheduledUnit.class` which need to carry the 
`onlyAllocateBasePreferLocation` flag. 

After all, there are 7 classes need to be modified.

The bref code looks like this:

# *Execution.class*
{code:java}
class Execution {

  private volatile Future<SimpleSlot> assignedFutureResource; 
  //the onlyAllocateBasePreferLocation parameter will be used for problem 2.
  public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, 
boolean queued, boolean onlyAllocateBasePreferLocation)
                        throws IllegalExecutionStateException {
                /*
                */
                if (transitionState(CREATED, SCHEDULED)) {

                        ScheduledUnit toSchedule = locationConstraint == null ?
                                        new ScheduledUnit(this, sharingGroup) :
                                        new ScheduledUnit(this, sharingGroup, 
locationConstraint);
                                        
                        
toSchedule.setOnlyAllocateBasePreferInputs(onlyAllocateBasePreferLocation);

                        //record the assign info by update 
assignedFutureResource
                        assignedFutureResource = 
slotProvider.allocateSlot(toSchedule, queued);
                        if (assignedFutureResource == null) {
                                transitionState(SCHEDULED, CREATED);
                        }
                        return assignedFutureResource;
                }
                /*
                */
        }
        
        public Future<SimpleSlot> getAllocateFutureSlot() {
      return assignedFutureResource;
        }
}
{code}


# *ExecutionVertex.class*
{code:java}
class ExecutionVertex {
  public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
                /*
                */
                //try to look-up futureSlot if getCurrentAssignedResource() 
return null
                SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
                                                
                if (sourceSlot == null) {
                                Future<SimpleSlot> futureSlot = 
sources[k].getSource().getProducer().getCurrentExecutionAttempt().getAllocateFutureSlot();
                                if (futureSlot != null) {
                                        sourceSlot = futureSlot.get();
                                }
                }
                /*
                */
        }
}
{code}

# *ExecutionJobVertex.class*
{code:java}
class ExecutionJobVertex {
        public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider 
resourceProvider, boolean queued) {
                final List<ExecutionAndSlot> slots = new 
ArrayList<>(this.taskVertices.length);
                //step 1
                slots.addAll(allocateResourcesForAllHepler(resourceProvider, 
queued, true));
                //step 2
                slots.addAll(allocateResourcesForAllHepler(resourceProvider, 
queued, false));
                return slots.toArray(new 
ExecutionAndSlot[this.taskVertices.length]);
        }
        
        
        //In addition to adding an onlyAllocateBaseOnInputs parameter, it's 
similar the old allocateResourcesForAll function.
        public List<ExecutionAndSlot> 
allocateResourcesForAllHepler(SlotProvider resourceProvider, boolean queued, 
boolean onlyAllocateBaseOnInputs) {

                final List<ExecutionAndSlot> slots = new 
ArrayList<>(taskVertices.length);
                final List<ExecutionVertex> verticeList = 
Lists.newArrayList(this.taskVertices);

                final SlotSharingGroup sharingGroup = 
this.getSlotSharingGroup();

                if (sharingGroup == null) {
                        if (onlyAllocateBaseOnInputs) {
                                return slots;
                        }
                } else {
                        if (queued) {
                                throw new IllegalArgumentException(
                                        "A task with a vertex sharing group was 
scheduled in a queued fashion.");
                        }
                }

                for (int i = 0; i < verticeList.size(); ++i) {
                        boolean successful = false;
                        try {
                                final ExecutionVertex vertex = 
verticeList.get(i);
                                final Execution exec = 
vertex.getCurrentExecutionAttempt();
                                if (exec.hasPreAllocateSlot()) {
                                        successful = true;
                                        continue;
                                }
                                Future<SimpleSlot> future = 
exec.allocateSlotForExecution(resourceProvider, queued, 
onlyAllocateBaseOnInputs);
                                if (future != null) {
                                        slots.add(new ExecutionAndSlot(exec, 
future));
                                }
                                successful = true;
                        } finally {
                                if (!successful) {
                                        // this is the case if an exception was 
thrown
                                        for (ExecutionAndSlot slot: slots) {
                                                
ExecutionGraphUtils.releaseSlotFuture(slot.slotFuture);
                                        }
                                }
                        }
                }

                return slots;
        }       
}
{code}

# *Scheduler.class*
{code:java}
class Scheduler {       
        //In scheduleTask(), we need to return null, when try to allocate base 
on inputs failed.
        private Object scheduleTask(ScheduledUnit task, boolean 
queueIfNoResource) throws NoResourceAvailableException {

                synchronized (globalLock) {
                        .................
                        SlotSharingGroup sharingUnit = 
task.getSlotSharingGroup();                      
                        if (sharingUnit != null) {
                                //
                                boolean isOnlyAllocateByPreferInputs = 
task.getOnlyAllocateByPreferInputs();
                                if (isOnlyAllocateByPreferInputs && 
slotFromGroup == null) {
                                        return null;
                                }
                                //
                        }
                        .......
                }
        }
}
{code}

# *SlotSharingGroupAssignment.class*
{code:java}
class SlotSharingGroupAssignment {
  
  private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(AbstractID 
groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly, 
boolean onlyAllocateBaseOnInputs) {      
  /*
  */
  if (preferredLocations != null) {
                        for (TaskManagerLocation location : preferredLocations) 
{

                                // set the flag that we failed a preferred 
location. If one will be found,
                                // we return early anyways and skip the flag 
evaluation
                                didNotGetPreferred = true;

                                SharedSlot slot = 
removeFromMultiMap(slotsForGroup, location.getResourceID());
                                if (slot != null && slot.isAlive()) {
                                        return new Tuple2<>(slot, 
Locality.LOCAL);
                                }
                        }
                }

                if (onlyAllocateBaseOnInputs) {
                        return null;
                }    
  }
  /*
  */
}
{code}

Thank you very much for your time to review my solution. Can you assign this 
issue to me, i'm willing to contribute to flink so much.

Thanks,
Sihua ZHou

> Eager Scheduling can't allocate source for ExecutionGraph correctly
> -------------------------------------------------------------------
>
>                 Key: FLINK-7153
>                 URL: https://issues.apache.org/jira/browse/FLINK-7153
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.3.1
>            Reporter: Sihua Zhou
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.3.2
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to