[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16086915#comment-16086915
 ] 

Sihua Zhou commented on FLINK-7153:
-----------------------------------

Hi [~StephanEwen],

Here is my solution.

For  problem 1, We need to record the `Future<SimplSlot>` in `Execution` as 
soon as  `Execution.allocateSlotForExecution()` got a SimpleSlot successfully. 
So i add a private field `Future<SimpleSlot> assignedFutureResource` with the 
getter method for `Execution` class, the getter method will be called by 
`ExecutionVertex.getPreferredLocationsBasedOnInputs()`. After that, problem 1 
can be solved. This need to modify `Executioin.class` and 
`ExecutionVertex.class`.

For problem 2, i think we need change the allocate strategy into two steps,
*  Step 1, Only allocate from SlothGroup base on inputs, in this step every 
ExectionVertex that can be allocate with local partition will be allocate 
successfully.  
*  Step 2, Do normal allocation for the remain ExecutionVertex.
With the above two steps, problem 2 can be solved. This need to modify 
`ExecutionJobVertex.class` , `Scheduler.class`, `SlotSharingGroupAssignment` 
and `ScheduledUnit.class` which need to carry the 
`onlyAllocateBasePreferLocation` flag. 

After all, there are 7 classes need to be modified.

The bref code looks like this:

# *Execution.class*
{code:java}
class Execution {

  private volatile Future<SimpleSlot> assignedFutureResource; 
  //the onlyAllocateBasePreferLocation parameter will be used for problem 2.
  public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, 
boolean queued, boolean onlyAllocateBasePreferLocation)
                        throws IllegalExecutionStateException {
                /*
                */
                if (transitionState(CREATED, SCHEDULED)) {

                        ScheduledUnit toSchedule = locationConstraint == null ?
                                        new ScheduledUnit(this, sharingGroup) :
                                        new ScheduledUnit(this, sharingGroup, 
locationConstraint);
                                        
                        
toSchedule.setOnlyAllocateBasePreferInputs(onlyAllocateBasePreferLocation);

                        //record the assign info by update 
assignedFutureResource
                        assignedFutureResource = 
slotProvider.allocateSlot(toSchedule, queued);
                        if (assignedFutureResource == null) {
                                transitionState(SCHEDULED, CREATED);
                        }
                        return assignedFutureResource;
                }
                /*
                */
        }
        
        public Future<SimpleSlot> getAllocateFutureSlot() {
      return assignedFutureResource;
        }
}
{code}


# *ExecutionVertex.class*
{code:java}
class ExecutionVertex {
  public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
                /*
                */
                //try to look-up futureSlot if getCurrentAssignedResource() 
return null
                SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
                                                
                if (sourceSlot == null) {
                                Future<SimpleSlot> futureSlot = 
sources[k].getSource().getProducer().getCurrentExecutionAttempt().getAllocateFutureSlot();
                                if (futureSlot != null) {
                                        sourceSlot = futureSlot.get();
                                }
                }
                /*
                */
        }
}
{code}

# *ExecutionJobVertex.class*
{code:java}
class ExecutionJobVertex {
        public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider 
resourceProvider, boolean queued) {
                final List<ExecutionAndSlot> slots = new 
ArrayList<>(this.taskVertices.length);
                //step 1
                slots.addAll(allocateResourcesForAllHepler(resourceProvider, 
queued, true));
                //step 2
                slots.addAll(allocateResourcesForAllHepler(resourceProvider, 
queued, false));
                return slots.toArray(new 
ExecutionAndSlot[this.taskVertices.length]);
        }
        
        
        //In addition to adding an onlyAllocateBaseOnInputs parameter, it's 
similar the old allocateResourcesForAll function.
        public List<ExecutionAndSlot> 
allocateResourcesForAllHepler(SlotProvider resourceProvider, boolean queued, 
boolean onlyAllocateBaseOnInputs) {

                final List<ExecutionAndSlot> slots = new 
ArrayList<>(taskVertices.length);
                final List<ExecutionVertex> verticeList = 
Lists.newArrayList(this.taskVertices);

                final SlotSharingGroup sharingGroup = 
this.getSlotSharingGroup();

                if (sharingGroup == null) {
                        if (onlyAllocateBaseOnInputs) {
                                return slots;
                        }
                } else {
                        if (queued) {
                                throw new IllegalArgumentException(
                                        "A task with a vertex sharing group was 
scheduled in a queued fashion.");
                        }
                }

                for (int i = 0; i < verticeList.size(); ++i) {
                        boolean successful = false;
                        try {
                                final ExecutionVertex vertex = 
verticeList.get(i);
                                final Execution exec = 
vertex.getCurrentExecutionAttempt();
                                if (exec.hasPreAllocateSlot()) {
                                        successful = true;
                                        continue;
                                }
                                Future<SimpleSlot> future = 
exec.allocateSlotForExecution(resourceProvider, queued, 
onlyAllocateBaseOnInputs);
                                if (future != null) {
                                        slots.add(new ExecutionAndSlot(exec, 
future));
                                }
                                successful = true;
                        } finally {
                                if (!successful) {
                                        // this is the case if an exception was 
thrown
                                        for (ExecutionAndSlot slot: slots) {
                                                
ExecutionGraphUtils.releaseSlotFuture(slot.slotFuture);
                                        }
                                }
                        }
                }

                return slots;
        }       
}
{code}

# *Scheduler.class*
{code:java}
class Scheduler {       
        //In scheduleTask(), we need to return null, when try to allocate base 
on inputs failed.
        private Object scheduleTask(ScheduledUnit task, boolean 
queueIfNoResource) throws NoResourceAvailableException {

                synchronized (globalLock) {
                        .................
                        SlotSharingGroup sharingUnit = 
task.getSlotSharingGroup();                      
                        if (sharingUnit != null) {
                                //
                                boolean isOnlyAllocateByPreferInputs = 
task.getOnlyAllocateByPreferInputs();
                                if (isOnlyAllocateByPreferInputs && 
slotFromGroup == null) {
                                        return null;
                                }
                                //
                        }
                        .......
                }
        }
}
{code}

# *SlotSharingGroupAssignment.class*
{code:java}
class SlotSharingGroupAssignment {
  
  private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(AbstractID 
groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly, 
boolean onlyAllocateBaseOnInputs) {      
  /*
  */
  if (preferredLocations != null) {
                        for (TaskManagerLocation location : preferredLocations) 
{

                                // set the flag that we failed a preferred 
location. If one will be found,
                                // we return early anyways and skip the flag 
evaluation
                                didNotGetPreferred = true;

                                SharedSlot slot = 
removeFromMultiMap(slotsForGroup, location.getResourceID());
                                if (slot != null && slot.isAlive()) {
                                        return new Tuple2<>(slot, 
Locality.LOCAL);
                                }
                        }
                }

                if (onlyAllocateBaseOnInputs) {
                        return null;
                }    
  }
  /*
  */
}
{code}

Thank you very much for your time to review my solution. Can you assign this 
issue to me, i'm willing to contribute to flink so much.

Thanks,
Sihua ZHou

> Eager Scheduling can't allocate source for ExecutionGraph correctly
> -------------------------------------------------------------------
>
>                 Key: FLINK-7153
>                 URL: https://issues.apache.org/jira/browse/FLINK-7153
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.3.1
>            Reporter: Sihua Zhou
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.3.2
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to