[ 
https://issues.apache.org/jira/browse/IMPALA-10287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722013#comment-17722013
 ] 

ASF subversion and git services commented on IMPALA-10287:
----------------------------------------------------------

Commit 1d0b111bcf090efa521e6bf09c0c2eebec4b8976 in impala's branch 
refs/heads/master from Riza Suminto
[ https://gitbox.apache.org/repos/asf?p=impala.git;h=1d0b111bc ]

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option MAX_FRAGMENT_INSTANCES_PER_NODE is added as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two are
considered to encourage Frontend to choose a larger executor group as
needed.

Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    
--query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A
 \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | 
Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | 
Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   
+1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 
2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   
+1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 
0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   
+1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 
1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   
+0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 
0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   
+0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 
0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   
-0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | 
-0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   
-0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | 
-0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   
-1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | 
-0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   
-2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | 
-3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Reviewed-on: http://gerrit.cloudera.org:8080/19807
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


> Distribution strategy is sub-optimal for certain queries
> --------------------------------------------------------
>
>                 Key: IMPALA-10287
>                 URL: https://issues.apache.org/jira/browse/IMPALA-10287
>             Project: IMPALA
>          Issue Type: Improvement
>          Components: Frontend
>    Affects Versions: Impala 3.4.0
>            Reporter: Aman Sinha
>            Assignee: Aman Sinha
>            Priority: Major
>             Fix For: Impala 4.0.0
>
>
> I ran a simplified query (extracted from q78 of TPC-DS) on a 600GB dataset on 
> an 8 node cluster. I forced the distribution strategy for the left outer join 
> and compared Broadcast vs Hash Partition for different values of mt_dop.  The 
> example query and results are shown below (elapsed times are in seconds):
> {noformat}
> Query (with shuffle or broadcast hint):
> select count(*)
>    from store_sales
>    left join [shuffle] store_returns on sr_ticket_number=ss_ticket_number 
>          and ss_item_sk=sr_item_sk
>    join date_dim on ss_sold_date_sk = d_date_sk
>    where sr_ticket_number is null
>    and d_year=2002;
> {noformat}
> ||mt_dop||Broadcast||Partition||
> |1|45|15|
> |2|37|9|
> |4|33|5|
> |8|31|4|
> |12|31|4|
> Given the nearly 7.5x speedup for partition distribution at mt_dop = 12 
> (which is the default), it indicates that the cost formula comparing the 
> broadcast vs partition needs to be modified to take into account the mt_dop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to