[
https://issues.apache.org/jira/browse/DRILL-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15623642#comment-15623642
]
Padma Penumarthy edited comment on DRILL-4706 at 12/3/16 12:44 AM:
-------------------------------------------------------------------
Notes about how current parallelization algorithms(SoftAffinity and
HardAffinity based) work, why remote reads are happening and details of new
algorithm (LocalAffinity based) implemented.
Most operators do not have any affinity i.e. they can be scheduled on any node.
GroupScan and Store are the operators which have affinity i.e. they implement
HasAffinity interface. Operators which have affinity convey their affinity
preference through getDistributionAffinity() interface. Affinity for the
fragment, which decides how the fragment is going to be parallelized is
decided by the operator with most restricted affinity in the fragment.
Restriction is enforced in the following order: NONE, SOFT and HARD. We have
two parallelization algorithms (SOFT and HARD). NONE and SOFT use
SoftAffinityFragmentParallelizer and HARD uses
HardAffinityFragmentParallelizer.
Screen (which implements Store) and distributed system tables(memory, threads)
use hard affinity.
Group Scan and system tables (boot, drill bits and version) use soft affinity.
If we have screen (hard affinity) and scan (soft affinity) in the same
fragment, affinity for the fragment will be hard since that is the more
restrictive of the two.
SoftAffinity (current algorithm for scheduling parquet scan fragments):
Initialization (getPlan -> GetGroupScan -> ParquetGroupScan.init):
1.When parquet metadata is read, for each rowGroup,
HostAffinity for each host (ratio of number of bytes present on that host /
total bytes for the rowGroup) is calculated.
2. EndPointAffinity for each host (ratio of number of bytes on the host/total
bytes for the whole scan) is calculated.
Parallelize the scan (SoftAffinityFragmentParallelizer.parallelizeFragment):
1. Compute how many total fragments to schedule (width) (Based on cost, slice
target, min and maxWidth for the operator, maxGlobalWidth, maxWidthPerNode etc.)
2. Divide by number of nodes - This is the average number of fragments we want
to run on each node.
3. To favor nodes with affinity > 0 i.e nodes that have some local data (does
not matter what the value is),
multiply the value from 2 above by affinity factor - This is the number of
fragments we want to schedule on each node with affinity.
4. Schedule upto number of fragments calculated from 3 above on each of the
nodes with affinity in round robin fashion.
5. If we schedule the required number of fragments (i.e. width from 1 above),
we are done.
6. Else, rest of fragments, we schedule on nodes which do not have any local
data i.e. nodes with no affinity in a round robin fashion.
Assignment(AssignmentCreator.getMappings):
1. To distribute rowGroups uniformly, calculate maxWork each fragment should do
(total number of rowGroups/total number of fragments)
2. For each endpoint, calculate the maxCount (maxWork * number of fragments on
the endpoint) and minCount (at least 1 per fragment or (maxWork-1) * number of
fragments) number of rowGroups to assign.
3. Assign up to minCount rowGroups per endpoint in a round robin fashion,
selecting from the sorted list of hosts(sorted based on host affinity) for each
rowGroup.
4. If there are any leftovers, assign to the endPoints which do not have
minimum (i.e. minCount) assigned yet.
5. If there are still leftovers, assign to the endPoints which do not maximum
(i.e. maxCount) assigned yet.
Why is this causing remote reads and why increasing affinity factor does not
help ?
When the data is skewed i.e. data is not distributed equally, all nodes with
affinity still get equal number of fragments assigned (because they have some
data).
We are not assigning fragments proportional to affinity value i.e. amount of
data available on the node.
So, some of them have to do remote read since data is not available locally.
Since they all are treated equally,
increasing affinity factor does not help. affinity factor only helps in
eliminating nodes which do not
have any data vs. nodes which have some data.
Another problem is calculation of endpoint affinity values. We do not take
replication factor into account and end up including bytes for a rowGroup
multiple times on different hosts. Based on data distribution, this results in
skewed affinity values which do not reflect how those values are being/should
be used.
HardAffinity Algorithm:
HardAffinity assigns fragments only to nodes which are marked as mandatory in
the endpointAffinity. It works as follows:
1. Add nodes with endpointAffinities marked as assignmentRequired (i.e.
mandatory) to the endpointPool.
2 .Calculate how many fragments to schedule (width) (Based on cost, slice
target, min and maxWidth for the fragment, maxGlobalWidth, maxWidthPerNode and
maxWidth specified in endpointAffinities)
3. Note that width is constrained by maxWidthPerNode * endpointPool.size() (1
above) and sum of maxWidths specified by each endpointAffinity entry.
4. Schedule at least 1 fragment on each endpoint in the endpointPool (since
they all are marked as mandatory)
5. For the remaining slots i.e. (width - endpointPool.size()), assign
fragments to endpoints proportional to their affinity constrained by
maxWidthPerNode and endpointAffinity maxWidth.
LocalAffinity (new algorithm based on locality of data):
This is not enabled by default. To use the new algorithm, we need to set system
option `parquet.use_local_affinity`=true. Every effort is made to have the new
code under the new option so no regressions are introduced.
This will invoke a new local affinity fragment parallelizer which is less
restrictive than soft affinity fragment parallelizer and is enabled only for
parquet group scan.
Initialization(getPlan -> GetGroupScan -> ParquetGroupScan.init):
1. When parquet metadata is read, for each rowGroup, we need to compute the
best possible host to scan it on (computeRowGroupAssignment)
2. For each rowGroup, get the lists of hosts which have maximum data available
locally for the rowGroup (topEndpoints).
3. From that list, pick the node which has minimum amount of work assigned so
far (based on number of bytes assigned to scan on that node).
4. Repeat 2 and 3 for second pass so we make adjustments after one round of
allocations are done i.e. after first iteration.
5. Once we compute the best possible node on which to scan the rowGroup, save
that information (preferredEndpoint). Note: preferredEndpoint will be null if
there is no drillbit running on any of the nodes which have data or if it is
local file system.
6. Update endpointAffinity for each node with the number of rowGroups
(numLocalWorkUnits) assigned to be scanned on that endpoint.
Parallelize the Scan(LocalAffinityFragmentParallelizer.parallelizeFragment):
1. Decide how many total fragments to run (width) (Based on cost, slice target,
min and maxWidth for the operator, maxGlobalWidth, maxWidthPerNode etc.)
2. Include each endpoint which has affinity with numLocalWorkUnits > 0 in the
list of endpoints on which we want to schedule the fragments
(localEndpointPool).
3. Since we want to assign only to nodes with data locality, constrain the
number calculated in 1 above to maximum that can be assigned to nodes with
locality to be localWidth (maxWidthPerNode * localEndpointPool.size())
4. Sort the endpoints in localEndpointPool based on work they have to do i.e.
numLocalWorkUnits
5. Calculate how many fragments to assign to each of the nodes in
localEndpointPool based on how much work they have to do i.e. targetAllocation
(proportional to numLocalWorkUnits assigned to the node).
6. Assign one fragment to each of the nodes from the localEndpointPool to make
sure minimum of one is assigned to each of them.
7. Go through the sorted localEndpointPool in a round robin way and keep
assigning fragments to individual nodes till their target allocation or
maxWidthPerNode is reached.
8. For 6 and 7, Stop when overall allocation reaches the total target
(localWidth).
9. At this point, we have taken care of allocating fragments for
totalLocalWorkUnits, i.e. workUnits which have data locality information.
10. If we have assigned fragments for all workUnits, we are done.
11. It is possible that some workUnits have preferred endPoints null (because
there is no drill bit running on the hosts which have data for the workUnit).
In that case, we will have unassigned work Items.
12. Allocate the fragments for unassigned work items to active end points,
making sure maxWidthPerNode constraint is honored.
Assignment(AssignmentCreator.getMappings):
1. When the system option parquet.use_local_affinity is set to true, assign
each rowGroup to a fragment (round robin) on it’s preferredEndPoint
(assignLocal).
2. If the preferredEndpoint is null or fragment is not available on that node,
add it to unassignedList
3. If we have unassigned work items, first assign at least one work item to
fragments which have nothing assigned so we meet the minimum requirement.
4. Assign any remaining unassigned work items to fragments in a round robin way.
was (Author: ppenumarthy):
Notes about how current algorithm(SoftAffinity based) works, why remote reads
happen and the new algorithm (LocalAffinity based) implemented.
SoftAffinity (current algorithm for scheduling parquet scan fragments):
Initialization (getPlan -> GetGroupScan -> ParquetGroupScan.init):
1.When parquet metadata is read, for each rowGroup,
HostAffinity for each host (ratio of number of bytes present on that host /
total bytes for the rowGroup) is calculated.
2. EndPointAffinity for each host (ratio of number of bytes on the host/total
bytes for the whole scan) is calculated.
Parallelize the scan (SoftAffinityFragmentParallelizer.parallelizeFragment):
1. Compute how many total fragments to schedule (width) (Based on cost, slice
target, min and maxWidth for the operator, maxGlobalWidth, maxWidthPerNode etc.)
2. Divide by number of nodes - This is the average number of fragments we want
to run on each node.
3. To favor nodes with affinity > 0 i.e nodes that have some local data (does
not matter what the value is),
multiply the value from 2 above by affinity factor - This is the number of
fragments we want to schedule on each node with affinity.
4. Schedule upto number of fragments calculated from 3 above on each of the
nodes with affinity in round robin fashion.
5. If we schedule the required number of fragments (i.e. width from 1 above),
we are done.
6. Else, rest of fragments, we schedule on nodes which do not have any local
data i.e. nodes with no affinity in a round robin fashion.
Assignment(AssignmentCreator.getMappings):
1. To distribute rowGroups uniformly, calculate maxWork each fragment should do
(total number of rowGroups/total number of fragments)
2. For each endpoint, calculate the maxCount (maxWork * number of fragments on
the endpoint) and minCount (at least 1 per fragment or (maxWork-1) * number of
fragments) number of rowGroups to assign.
3. Assign up to minCount rowGroups per endpoint in a round robin fashion,
selecting from the sorted list of hosts(sorted based on host affinity) for each
rowGroup.
4. If there are any leftovers, assign to the endPoints which do not have
minimum (i.e. minCount) assigned yet.
5. If there are still leftovers, assign to the endPoints which do not maximum
(i.e. maxCount) assigned yet.
Why is this causing remote reads and why increasing affinity factor does not
help ?
When the data is skewed i.e. data is not distributed equally, all nodes with
affinity still get equal number of fragments assigned (because they have some
data).
We are not assigning fragments proportional to affinity value i.e. amount of
data available on the node.
So, some of them have to do remote read since data is not available locally.
Since they all are treated equally,
increasing affinity factor does not help. affinity factor only helps in
eliminating nodes which do not
have any data vs. nodes which have some data.
Another problem is calculation of endpoint affinity values. We do not take
replication factor into account and end up including bytes for a rowGroup
multiple times on different hosts. Based on data distribution, this results in
skewed affinity values which do not reflect how those values are being/should
be used.
LocalAffinity (new algorithm based on locality of data):
This is not enabled by default. To use the new algorithm, we need to set system
option `parquet.use_local_affinity`=true. Every effort is made to have the new
code under the new option so no regressions are introduced.
This will invoke a new local affinity fragment parallelizer which is less
restrictive than soft affinity fragment parallelizer and is enabled only for
parquet group scan.
Initialization(getPlan -> GetGroupScan -> ParquetGroupScan.init):
1. When parquet metadata is read, for each rowGroup, we need to compute the
best possible host to scan it on (computeRowGroupAssignment)
2. For each rowGroup, get the lists of hosts which have maximum data available
locally for the rowGroup (topEndpoints).
3. From that list, pick the node which has minimum amount of work assigned so
far (based on number of bytes assigned to scan on that node).
4. Repeat 2 and 3 for second pass so we make adjustments after one round of
allocations are done i.e. after first iteration.
5. Once we compute the best possible node on which to scan the rowGroup, save
that information (preferredEndpoint). Note: preferredEndpoint will be null if
there is no drillbit running on any of the nodes which have data or if it is
local file system.
6. Update endpointAffinity for each node with the number of rowGroups
(localWorkUnits) assigned to be scanned on that endpoint.
Parallelize the Scan(LocalAffinityFragmentParallelizer.parallelizeFragment):
1. Decide how many total fragments to run (width) (Based on cost, slice target,
min and maxWidth for the operator, maxGlobalWidth, maxWidthPerNode etc.)
2. Include each endpoint which has affinity with localWorkUnits > 0 in the list
of endpoints on which we want to schedule the fragments (endpointPool).
3. Assign one fragment to each of the nodes from the above endpointPool to make
sure minimum of one is assigned to each of them.
4. Calculate how many fragments to assign to each of the nodes in endpointPool
based on how much work they have to do i.e. targetAllocation (proportional to
localWorkUnits assigned to the node).
5. Go through the endpointPool in a round robin way and keep assigning
fragments to individual nodes till their target allocation or maxWidthPerNodes
is reached.
6. Stop when overall allocation reaches the total target i.e. width above.
7. It is possible that some rowGroups have preferred endPoints null (because
there is no drill bit running on the hosts which have data for the rowGroup).
In that case, we will have unassigned work Items.
8. Allocate the fragments for unassigned work items to active end points,
making sure maxWidthPerNode constraint is honored.
Assignment(AssignmentCreator.getMappings):
1. When the system option parquet.use_local_affinity is set to true, assign
each rowGroup to a fragment (round robin) on it’s preferredEndPoint
(assignLocal).
2. If the preferredEndpoint is null or fragment is not available on that node,
add it to unassignedList
3. Fallback to current algorithm to assign unassigned list of rowGroups from 2.
> Fragment planning causes Drillbits to read remote chunks when local copies
> are available
> ----------------------------------------------------------------------------------------
>
> Key: DRILL-4706
> URL: https://issues.apache.org/jira/browse/DRILL-4706
> Project: Apache Drill
> Issue Type: Bug
> Components: Query Planning & Optimization
> Affects Versions: 1.6.0
> Environment: CentOS, RHEL
> Reporter: Kunal Khatua
> Assignee: Padma Penumarthy
> Labels: performance, planning
>
> When a table (datasize=70GB) of 160 parquet files (each having a single
> rowgroup and fitting within one chunk) is available on a 10-node setup with
> replication=3 ; a pure data scan query causes about 2% of the data to be read
> remotely.
> Even with the creation of metadata cache, the planner is selecting a
> sub-optimal plan of executing the SCAN fragments such that some of the data
> is served from a remote server.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)