[
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16222717#comment-16222717
]
ASF GitHub Bot commented on FLINK-7153:
---------------------------------------
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/4916
[FLINK-7153] Re-introduce preferred locations for scheduling
## What is the purpose of the change
This PR makes the `TaskManagerLocation` accessible for asynchronous
scheduling.
Due to changes for Flink 1.3 where we introduced asynchronous scheduling,
it was not always guaranteed that the scheduler knew about the scheduling
locations of producer tasks. Especially the eager scheduling mode was affected
since the slot allocation happened concurrently.
In order to fix this problem, this PR adds a `TaskManagerLocationFuture` to
each `Execution`. In eager scheduling mode, a slot will only be requested for a
task if all its inputs have a slot assigned (e.g. their
`TaskManagerLocationFuture` is completed). In lazy scheduling mode, we don't
wait for the completion of all inputs, but take those inputs which are already
known.
In order to distinguish whether we want to wait for all or take all
available task manager locations, we add a `LocationPreferenceConstraint` which
has the values `ALL` and `ANY`. `ALL` means that we have to wait for all inputs
to have a location assigned, and `ANY` means that we take what's currently
known.
In order to not deploy slots prematurely in eager mode, the slot assignment
has been factored out into its own step. Before, one had to call
`Execution#deployToSlot(SimpleSlot)` which assigned the given slot and started
the deployment. Now, one has to call `Execution#tryAssignResource` before one
can call `Execution#deploy`.
Moreover this PR fixes that the `FailoverRegions` are topologically sorted
which is important for non queued scheduling.
FYI @StephanEwen
## Brief change log
- Introduce `LocationPreferenceConstraint` to distinguish the waiting
behaviour for the preferred locations
- Split slot assignment and deployment into two separate steps
- Moved preferred location calculation into the Execution to reduce code
duplication between the `Scheduler` and the `SlotPool`
- Changed preferred location calculation to be blocking if
`LocationPreferenceConstraint#ALL` and not all input locations are known
## Verifying this change
This change added tests and can be verified as follows:
- Added `ExecutionTest` to check the correct assigned slot release in case
of cancellation and to check the correct preferred location calculation
- Added
`ExecutionGraphDeploymentTest#testEagerSchedulingWaitsOnAllInputPreferredLocations`
to check that eager scheduling waits for all inputs to be assigned before
scheduling consumer tasks
- Moreover, the scheduler is being tested by existing tests such as
`SchedulerSlotSharingTest`, `ScheduleWithCoLocationHintTest` and many IT cases
for lazy scheduling (batch case)
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink fixGroupScheduling2
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4916.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4916
----
commit 32eb1812583b84d80091d1a278d53ed663d8a065
Author: Till <[email protected]>
Date: 2017-10-16T12:04:13Z
[FLINK-7153] Re-introduce preferred locations for scheduling
commit 8c0c9aeaa7ca995247f2b9f9e63723e52d839a12
Author: Till Rohrmann <[email protected]>
Date: 2017-10-27T07:47:03Z
[FLINK-7153] Introduce LocationPreferenceConstraint for scheduling
The LocationPreferenceConstraint defines whether all or any preferred
locations
have to be taken into consideration when scheduling tasks. Especially for
batch
jobs where we do lazy scheduling not all input locations might be known for
a
consumer task. Therefore, we set the location preference constraint to any
which
means that only those location are taken into consideration which are known
at
scheduling time.
commit c821e67529deaaed96f183fc22bc0a9fe246fa23
Author: Till Rohrmann <[email protected]>
Date: 2017-10-26T16:22:43Z
[hotfix] Make failover region topological sorted
commit 67baeade85e26758978bcdf7982576a2f4192aae
Author: Till Rohrmann <[email protected]>
Date: 2017-10-27T17:08:15Z
[FLINK-7153] Add test cases
----
> Eager Scheduling can't allocate source for ExecutionGraph correctly
> -------------------------------------------------------------------
>
> Key: FLINK-7153
> URL: https://issues.apache.org/jira/browse/FLINK-7153
> Project: Flink
> Issue Type: Bug
> Components: JobManager
> Affects Versions: 1.3.1
> Reporter: Sihua Zhou
> Assignee: Till Rohrmann
> Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been
> deployed via 'Execution.deployToSlot()'. So allocate resource base on
> prefered location can't work correctly, we need to set the slot info for
> `Execution` as soon as Execution.allocateSlotForExecution() called
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize. Here is the
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE,
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three
> remote partition. But actually, it should be 2 local partition and 2 remote
> partition.
> The causes of the above problems is becuase that the current allocate
> strategy is allocate the resource for execution one by one(if the execution
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it).
> If we change the allocate strategy to two step will solve this problem, below
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)