http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be0f122/docs/topics/impala_perf_joins.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_perf_joins.xml b/docs/topics/impala_perf_joins.xml new file mode 100644 index 0000000..4fcd896 --- /dev/null +++ b/docs/topics/impala_perf_joins.xml @@ -0,0 +1,506 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd"> +<concept id="perf_joins"> + + <title>Performance Considerations for Join Queries</title> + <titlealts audience="PDF"><navtitle>Join Performance</navtitle></titlealts> + <prolog> + <metadata> + <data name="Category" value="Impala"/> + <data name="Category" value="Performance"/> + <data name="Category" value="Querying"/> + <data name="Category" value="SQL"/> + <data name="Category" value="Developers"/> + <data name="Category" value="Data Analysts"/> + </metadata> + </prolog> + + <conbody> + + <p> + Queries involving join operations often require more tuning than queries that refer to only one table. The + maximum size of the result set from a join query is the product of the number of rows in all the joined + tables. When joining several tables with millions or billions of rows, any missed opportunity to filter the + result set, or other inefficiency in the query, could lead to an operation that does not finish in a + practical time and has to be cancelled. + </p> + + <p rev="1.2.2"> + The simplest technique for tuning an Impala join query is to collect statistics on each table involved in the + join using the <codeph><xref href="impala_compute_stats.xml#compute_stats">COMPUTE STATS</xref></codeph> + statement, and then let Impala automatically optimize the query based on the size of each table, number of + distinct values of each column, and so on. The <codeph>COMPUTE STATS</codeph> statement and the join + optimization are new features introduced in Impala 1.2.2. For accurate statistics about each table, issue the + <codeph>COMPUTE STATS</codeph> statement after loading the data into that table, and again if the amount of + data changes substantially due to an <codeph>INSERT</codeph>, <codeph>LOAD DATA</codeph>, adding a partition, + and so on. + </p> + + <p> + If statistics are not available for all the tables in the join query, or if Impala chooses a join order that + is not the most efficient, you can override the automatic join order optimization by specifying the + <codeph>STRAIGHT_JOIN</codeph> keyword immediately after the <codeph>SELECT</codeph> keyword. In this case, + Impala uses the order the tables appear in the query to guide how the joins are processed. + </p> + + <p> + When you use the <codeph>STRAIGHT_JOIN</codeph> technique, you must order the tables in the join query + manually instead of relying on the Impala optimizer. The optimizer uses sophisticated techniques to estimate + the size of the result set at each stage of the join. For manual ordering, use this heuristic approach to + start with, and then experiment to fine-tune the order: + </p> + + <ul> + <li> + Specify the largest table first. This table is read from disk by each Impala node and so its size is not + significant in terms of memory usage during the query. + </li> + + <li> + Next, specify the smallest table. The contents of the second, third, and so on tables are all transmitted + across the network. You want to minimize the size of the result set from each subsequent stage of the join + query. The most likely approach involves joining a small table first, so that the result set remains small + even as subsequent larger tables are processed. + </li> + + <li> + Join the next smallest table, then the next smallest, and so on. + </li> + + <li> + For example, if you had tables <codeph>BIG</codeph>, <codeph>MEDIUM</codeph>, <codeph>SMALL</codeph>, and + <codeph>TINY</codeph>, the logical join order to try would be <codeph>BIG</codeph>, <codeph>TINY</codeph>, + <codeph>SMALL</codeph>, <codeph>MEDIUM</codeph>. + </li> + </ul> + + <p> + The terms <q>largest</q> and <q>smallest</q> refers to the size of the intermediate result set based on the + number of rows and columns from each table that are part of the result set. For example, if you join one + table <codeph>sales</codeph> with another table <codeph>customers</codeph>, a query might find results from + 100 different customers who made a total of 5000 purchases. In that case, you would specify <codeph>SELECT + ... FROM sales JOIN customers ...</codeph>, putting <codeph>customers</codeph> on the right side because it + is smaller in the context of this query. + </p> + + <p> + The Impala query planner chooses between different techniques for performing join queries, depending on the + absolute and relative sizes of the tables. <b>Broadcast joins</b> are the default, where the right-hand table + is considered to be smaller than the left-hand table, and its contents are sent to all the other nodes + involved in the query. The alternative technique is known as a <b>partitioned join</b> (not related to a + partitioned table), which is more suitable for large tables of roughly equal size. With this technique, + portions of each table are sent to appropriate other nodes where those subsets of rows can be processed in + parallel. The choice of broadcast or partitioned join also depends on statistics being available for all + tables in the join, gathered by the <codeph>COMPUTE STATS</codeph> statement. + </p> + + <p> + To see which join strategy is used for a particular query, issue an <codeph>EXPLAIN</codeph> statement for + the query. If you find that a query uses a broadcast join when you know through benchmarking that a + partitioned join would be more efficient, or vice versa, add a hint to the query to specify the precise join + mechanism to use. See <xref href="impala_hints.xml#hints"/> for details. + </p> + </conbody> + + <concept rev="1.2.2" id="joins_no_stats"> + + <title>How Joins Are Processed when Statistics Are Unavailable</title> + <prolog> + <metadata> + <data name="Category" value="Concepts"/> + </metadata> + </prolog> + + <conbody> + + <p> + If table or column statistics are not available for some tables in a join, Impala still reorders the tables + using the information that is available. Tables with statistics are placed on the left side of the join + order, in descending order of cost based on overall size and cardinality. Tables without statistics are + treated as zero-size, that is, they are always placed on the right side of the join order. + </p> + </conbody> + </concept> + + <concept rev="1.2.2" id="straight_join"> + + <title>Overriding Join Reordering with STRAIGHT_JOIN</title> + + <conbody> + + <p> + If an Impala join query is inefficient because of outdated statistics or unexpected data distribution, you + can keep Impala from reordering the joined tables by using the <codeph>STRAIGHT_JOIN</codeph> keyword + immediately after the <codeph>SELECT</codeph> keyword. The <codeph>STRAIGHT_JOIN</codeph> keyword turns off + the reordering of join clauses that Impala does internally, and produces a plan that relies on the join + clauses being ordered optimally in the query text. In this case, rewrite the query so that the largest + table is on the left, followed by the next largest, and so on until the smallest table is on the right. + </p> + + <p> + In this example, the subselect from the <codeph>BIG</codeph> table produces a very small result set, but + the table might still be treated as if it were the biggest and placed first in the join order. Using + <codeph>STRAIGHT_JOIN</codeph> for the last join clause prevents the final table from being reordered, + keeping it as the rightmost table in the join order. + </p> + +<codeblock>select straight_join x from medium join small join (select * from big where c1 < 10) as big + where medium.id = small.id and small.id = big.id;</codeblock> + </conbody> + </concept> + + <concept id="perf_joins_examples"> + + <title>Examples of Join Order Optimization</title> + + <conbody> + + <p> + Here are examples showing joins between tables with 1 billion, 200 million, and 1 million rows. (In this + case, the tables are unpartitioned and using Parquet format.) The smaller tables contain subsets of data + from the largest one, for convenience of joining on the unique <codeph>ID</codeph> column. The smallest + table only contains a subset of columns from the others. + </p> + + <p></p> + +<codeblock>[localhost:21000] > create table big stored as parquet as select * from raw_data; ++----------------------------+ +| summary | ++----------------------------+ +| Inserted 1000000000 row(s) | ++----------------------------+ +Returned 1 row(s) in 671.56s +[localhost:21000] > desc big; ++-----------+---------+---------+ +| name | type | comment | ++-----------+---------+---------+ +| id | int | | +| val | int | | +| zfill | string | | +| name | string | | +| assertion | boolean | | ++-----------+---------+---------+ +Returned 5 row(s) in 0.01s +[localhost:21000] > create table medium stored as parquet as select * from big limit 200 * floor(1e6); ++---------------------------+ +| summary | ++---------------------------+ +| Inserted 200000000 row(s) | ++---------------------------+ +Returned 1 row(s) in 138.31s +[localhost:21000] > create table small stored as parquet as select id,val,name from big where assertion = true limit 1 * floor(1e6); ++-------------------------+ +| summary | ++-------------------------+ +| Inserted 1000000 row(s) | ++-------------------------+ +Returned 1 row(s) in 6.32s</codeblock> + + <p> + For any kind of performance experimentation, use the <codeph>EXPLAIN</codeph> statement to see how any + expensive query will be performed without actually running it, and enable verbose <codeph>EXPLAIN</codeph> + plans containing more performance-oriented detail: The most interesting plan lines are highlighted in bold, + showing that without statistics for the joined tables, Impala cannot make a good estimate of the number of + rows involved at each stage of processing, and is likely to stick with the <codeph>BROADCAST</codeph> join + mechanism that sends a complete copy of one of the tables to each node. + </p> + +<codeblock>[localhost:21000] > set explain_level=verbose; +EXPLAIN_LEVEL set to verbose +[localhost:21000] > explain select count(*) from big join medium where big.id = medium.id; ++----------------------------------------------------------+ +| Explain String | ++----------------------------------------------------------+ +| Estimated Per-Host Requirements: Memory=2.10GB VCores=2 | +| | +| PLAN FRAGMENT 0 | +| PARTITION: UNPARTITIONED | +| | +| 6:AGGREGATE (merge finalize) | +| | output: SUM(COUNT(*)) | +| | cardinality: 1 | +| | per-host memory: unavailable | +| | tuple ids: 2 | +| | | +| 5:EXCHANGE | +| cardinality: 1 | +| per-host memory: unavailable | +| tuple ids: 2 | +| | +| PLAN FRAGMENT 1 | +| PARTITION: RANDOM | +| | +| STREAM DATA SINK | +| EXCHANGE ID: 5 | +| UNPARTITIONED | +| | +| 3:AGGREGATE | +| | output: COUNT(*) | +| | cardinality: 1 | +| | per-host memory: 10.00MB | +| | tuple ids: 2 | +| | | +| 2:HASH JOIN | +<b>| | join op: INNER JOIN (BROADCAST) |</b> +| | hash predicates: | +| | big.id = medium.id | +<b>| | cardinality: unavailable |</b> +| | per-host memory: 2.00GB | +| | tuple ids: 0 1 | +| | | +| |----4:EXCHANGE | +| | cardinality: unavailable | +| | per-host memory: 0B | +| | tuple ids: 1 | +| | | +| 0:SCAN HDFS | +<b>| table=join_order.big #partitions=1/1 size=23.12GB | +| table stats: unavailable | +| column stats: unavailable | +| cardinality: unavailable |</b> +| per-host memory: 88.00MB | +| tuple ids: 0 | +| | +| PLAN FRAGMENT 2 | +| PARTITION: RANDOM | +| | +| STREAM DATA SINK | +| EXCHANGE ID: 4 | +| UNPARTITIONED | +| | +| 1:SCAN HDFS | +<b>| table=join_order.medium #partitions=1/1 size=4.62GB | +| table stats: unavailable | +| column stats: unavailable | +| cardinality: unavailable |</b> +| per-host memory: 88.00MB | +| tuple ids: 1 | ++----------------------------------------------------------+ +Returned 64 row(s) in 0.04s</codeblock> + + <p> + Gathering statistics for all the tables is straightforward, one <codeph>COMPUTE STATS</codeph> statement + per table: + </p> + +<codeblock>[localhost:21000] > compute stats small; ++-----------------------------------------+ +| summary | ++-----------------------------------------+ +| Updated 1 partition(s) and 3 column(s). | ++-----------------------------------------+ +Returned 1 row(s) in 4.26s +[localhost:21000] > compute stats medium; ++-----------------------------------------+ +| summary | ++-----------------------------------------+ +| Updated 1 partition(s) and 5 column(s). | ++-----------------------------------------+ +Returned 1 row(s) in 42.11s +[localhost:21000] > compute stats big; ++-----------------------------------------+ +| summary | ++-----------------------------------------+ +| Updated 1 partition(s) and 5 column(s). | ++-----------------------------------------+ +Returned 1 row(s) in 165.44s</codeblock> + + <p> + With statistics in place, Impala can choose a more effective join order rather than following the + left-to-right sequence of tables in the query, and can choose <codeph>BROADCAST</codeph> or + <codeph>PARTITIONED</codeph> join strategies based on the overall sizes and number of rows in the table: + </p> + +<codeblock>[localhost:21000] > explain select count(*) from medium join big where big.id = medium.id; +Query: explain select count(*) from medium join big where big.id = medium.id ++-----------------------------------------------------------+ +| Explain String | ++-----------------------------------------------------------+ +| Estimated Per-Host Requirements: Memory=937.23MB VCores=2 | +| | +| PLAN FRAGMENT 0 | +| PARTITION: UNPARTITIONED | +| | +| 6:AGGREGATE (merge finalize) | +| | output: SUM(COUNT(*)) | +| | cardinality: 1 | +| | per-host memory: unavailable | +| | tuple ids: 2 | +| | | +| 5:EXCHANGE | +| cardinality: 1 | +| per-host memory: unavailable | +| tuple ids: 2 | +| | +| PLAN FRAGMENT 1 | +| PARTITION: RANDOM | +| | +| STREAM DATA SINK | +| EXCHANGE ID: 5 | +| UNPARTITIONED | +| | +| 3:AGGREGATE | +| | output: COUNT(*) | +| | cardinality: 1 | +| | per-host memory: 10.00MB | +| | tuple ids: 2 | +| | | +| 2:HASH JOIN | +| | join op: INNER JOIN (BROADCAST) | +| | hash predicates: | +| | big.id = medium.id | +| | cardinality: 1443004441 | +| | per-host memory: 839.23MB | +| | tuple ids: 1 0 | +| | | +| |----4:EXCHANGE | +| | cardinality: 200000000 | +| | per-host memory: 0B | +| | tuple ids: 0 | +| | | +| 1:SCAN HDFS | +| table=join_order.big #partitions=1/1 size=23.12GB | +| table stats: 1000000000 rows total | +| column stats: all | +| cardinality: 1000000000 | +| per-host memory: 88.00MB | +| tuple ids: 1 | +| | +| PLAN FRAGMENT 2 | +| PARTITION: RANDOM | +| | +| STREAM DATA SINK | +| EXCHANGE ID: 4 | +| UNPARTITIONED | +| | +| 0:SCAN HDFS | +| table=join_order.medium #partitions=1/1 size=4.62GB | +| table stats: 200000000 rows total | +| column stats: all | +| cardinality: 200000000 | +| per-host memory: 88.00MB | +| tuple ids: 0 | ++-----------------------------------------------------------+ +Returned 64 row(s) in 0.04s + +[localhost:21000] > explain select count(*) from small join big where big.id = small.id; +Query: explain select count(*) from small join big where big.id = small.id ++-----------------------------------------------------------+ +| Explain String | ++-----------------------------------------------------------+ +| Estimated Per-Host Requirements: Memory=101.15MB VCores=2 | +| | +| PLAN FRAGMENT 0 | +| PARTITION: UNPARTITIONED | +| | +| 6:AGGREGATE (merge finalize) | +| | output: SUM(COUNT(*)) | +| | cardinality: 1 | +| | per-host memory: unavailable | +| | tuple ids: 2 | +| | | +| 5:EXCHANGE | +| cardinality: 1 | +| per-host memory: unavailable | +| tuple ids: 2 | +| | +| PLAN FRAGMENT 1 | +| PARTITION: RANDOM | +| | +| STREAM DATA SINK | +| EXCHANGE ID: 5 | +| UNPARTITIONED | +| | +| 3:AGGREGATE | +| | output: COUNT(*) | +| | cardinality: 1 | +| | per-host memory: 10.00MB | +| | tuple ids: 2 | +| | | +| 2:HASH JOIN | +| | join op: INNER JOIN (BROADCAST) | +| | hash predicates: | +| | big.id = small.id | +| | cardinality: 1000000000 | +| | per-host memory: 3.15MB | +| | tuple ids: 1 0 | +| | | +| |----4:EXCHANGE | +| | cardinality: 1000000 | +| | per-host memory: 0B | +| | tuple ids: 0 | +| | | +| 1:SCAN HDFS | +| table=join_order.big #partitions=1/1 size=23.12GB | +| table stats: 1000000000 rows total | +| column stats: all | +| cardinality: 1000000000 | +| per-host memory: 88.00MB | +| tuple ids: 1 | +| | +| PLAN FRAGMENT 2 | +| PARTITION: RANDOM | +| | +| STREAM DATA SINK | +| EXCHANGE ID: 4 | +| UNPARTITIONED | +| | +| 0:SCAN HDFS | +| table=join_order.small #partitions=1/1 size=17.93MB | +| table stats: 1000000 rows total | +| column stats: all | +| cardinality: 1000000 | +| per-host memory: 32.00MB | +| tuple ids: 0 | ++-----------------------------------------------------------+ +Returned 64 row(s) in 0.03s</codeblock> + + <p> + When queries like these are actually run, the execution times are relatively consistent regardless of the + table order in the query text. Here are examples using both the unique <codeph>ID</codeph> column and the + <codeph>VAL</codeph> column containing duplicate values: + </p> + +<codeblock>[localhost:21000] > select count(*) from big join small on (big.id = small.id); +Query: select count(*) from big join small on (big.id = small.id) ++----------+ +| count(*) | ++----------+ +| 1000000 | ++----------+ +Returned 1 row(s) in 21.68s +[localhost:21000] > select count(*) from small join big on (big.id = small.id); +Query: select count(*) from small join big on (big.id = small.id) ++----------+ +| count(*) | ++----------+ +| 1000000 | ++----------+ +Returned 1 row(s) in 20.45s + +[localhost:21000] > select count(*) from big join small on (big.val = small.val); ++------------+ +| count(*) | ++------------+ +| 2000948962 | ++------------+ +Returned 1 row(s) in 108.85s +[localhost:21000] > select count(*) from small join big on (big.val = small.val); ++------------+ +| count(*) | ++------------+ +| 2000948962 | ++------------+ +Returned 1 row(s) in 100.76s</codeblock> + + <note> + When examining the performance of join queries and the effectiveness of the join order optimization, make + sure the query involves enough data and cluster resources to see a difference depending on the query plan. + For example, a single data file of just a few megabytes will reside in a single HDFS block and be processed + on a single node. Likewise, if you use a single-node or two-node cluster, there might not be much + difference in efficiency for the broadcast or partitioned join strategies. + </note> + </conbody> + </concept> +</concept>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be0f122/docs/topics/impala_perf_resources.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_perf_resources.xml b/docs/topics/impala_perf_resources.xml new file mode 100644 index 0000000..c538893 --- /dev/null +++ b/docs/topics/impala_perf_resources.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd"> +<concept id="mem_limits"> + + <title>Controlling Impala Resource Usage</title> + <titlealts audience="PDF"><navtitle>Controlling Resource Usage</navtitle></titlealts> + <prolog> + <metadata> + <data name="Category" value="Impala"/> + <data name="Category" value="Performance"/> + <data name="Category" value="Memory"/> + <data name="Category" value="Scalability"/> + <data name="Category" value="Resource Management"/> + <data name="Category" value="Administrators"/> + <data name="Category" value="Developers"/> + <data name="Category" value="Data Analysts"/> + </metadata> + </prolog> + + <conbody> + + <p> + Sometimes, balancing raw query performance against scalability requires limiting the amount of resources, + such as memory or CPU, used by a single query or group of queries. Impala can use several mechanisms that + help to smooth out the load during heavy concurrent usage, resulting in faster overall query times and + sharing of resources across Impala queries, MapReduce jobs, and other kinds of workloads across a CDH + cluster: + </p> + + <ul> + <li> + The Impala admission control feature uses a fast, distributed mechanism to hold back queries that exceed + limits on the number of concurrent queries or the amount of memory used. The queries are queued, and + executed as other queries finish and resources become available. You can control the concurrency limits, + and specify different limits for different groups of users to divide cluster resources according to the + priorities of different classes of users. This feature is new in Impala 1.3, and works with both CDH 4 and + CDH 5. See <xref href="impala_admission.xml#admission_control"/> for details. + </li> + + <li> + <p> + You can restrict the amount of memory Impala reserves during query execution by specifying the + <codeph>-mem_limit</codeph> option for the <codeph>impalad</codeph> daemon. See + <xref href="impala_config_options.xml#config_options"/> for details. This limit applies only to the + memory that is directly consumed by queries; Impala reserves additional memory at startup, for example to + hold cached metadata. + </p> + </li> + + <li> + <p> + For production deployment, <ph rev="upstream">Cloudera</ph> recommends that you implement resource isolation using mechanisms + such as cgroups, which you can configure using Cloudera Manager. For details, see the + <xref href="http://www.cloudera.com/documentation/enterprise/latest/topics/cm_mc_service_pools.html" scope="external" format="html">Static + Resource Pools</xref> in the Cloudera Manager documentation. + </p> + </li> + </ul> + </conbody> +</concept> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be0f122/docs/topics/impala_perf_skew.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_perf_skew.xml b/docs/topics/impala_perf_skew.xml new file mode 100644 index 0000000..b3a7cec --- /dev/null +++ b/docs/topics/impala_perf_skew.xml @@ -0,0 +1,150 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd"> +<concept id="perf_skew"> + + <title>Detecting and Correcting HDFS Block Skew Conditions</title> + <titlealts audience="PDF"><navtitle>HDFS Block Skew</navtitle></titlealts> + <prolog> + <metadata> + <data name="Category" value="Impala"/> + <data name="Category" value="Performance"/> + <data name="Category" value="HDFS"/> + <data name="Category" value="Proof of Concept"/> + <data name="Category" value="Administrators"/> + <data name="Category" value="Developers"/> + <data name="Category" value="Data Analysts"/> + </metadata> + </prolog> + + <conbody> + + <p> + For best performance of Impala parallel queries, the work is divided equally across hosts in the cluster, and + all hosts take approximately equal time to finish their work. If one host takes substantially longer than + others, the extra time needed for the slow host can become the dominant factor in query performance. + Therefore, one of the first steps in performance tuning for Impala is to detect and correct such conditions. + </p> + + <p> + The main cause of uneven performance that you can correct within Impala is <term>skew</term> in the number of + HDFS data blocks processed by each host, where some hosts process substantially more data blocks than others. + This condition can occur because of uneven distribution of the data values themselves, for example causing + certain data files or partitions to be large while others are very small. (Although it is possible to have + unevenly distributed data without any problems with the distribution of HDFS blocks.) Block skew could also + be due to the underlying block allocation policies within HDFS, the replication factor of the data files, and + the way that Impala chooses the host to process each data block. + </p> + + <p> + The most convenient way to detect block skew, or slow-host issues in general, is to examine the <q>executive + summary</q> information from the query profile after running a query: + </p> + + <ul> + <li> + <p> + In <cmdname>impala-shell</cmdname>, issue the <codeph>SUMMARY</codeph> command immediately after the + query is complete, to see just the summary information. If you detect issues involving skew, you might + switch to issuing the <codeph>PROFILE</codeph> command, which displays the summary information followed + by a detailed performance analysis. + </p> + </li> + + <li> + <p> + In the Cloudera Manager interface or the Impala debug web UI, click on the <uicontrol>Profile</uicontrol> + link associated with the query after it is complete. The executive summary information is displayed early + in the profile output. + </p> + </li> + </ul> + + <p> + For each phase of the query, you see an <uicontrol>Avg Time</uicontrol> and a <uicontrol>Max Time</uicontrol> + value, along with <uicontrol>#Hosts</uicontrol> indicating how many hosts are involved in that query phase. + For all the phases with <uicontrol>#Hosts</uicontrol> greater than one, look for cases where the maximum time + is substantially greater than the average time. Focus on the phases that took the longest, for example, those + taking multiple seconds rather than milliseconds or microseconds. + </p> + + <p> + If you detect that some hosts take longer than others, first rule out non-Impala causes. One reason that some + hosts could be slower than others is if those hosts have less capacity than the others, or if they are + substantially busier due to unevenly distributed non-Impala workloads: + </p> + + <ul> + <li> + <p> + For clusters running Impala, keep the relative capacities of all hosts roughly equal. Any cost savings + from including some underpowered hosts in the cluster will likely be outweighed by poor or uneven + performance, and the time spent diagnosing performance issues. + </p> + </li> + + <li> + <p> + If non-Impala workloads cause slowdowns on some hosts but not others, use the appropriate load-balancing + techniques for the non-Impala components to smooth out the load across the cluster. + </p> + </li> + </ul> + + <p> + If the hosts on your cluster are evenly powered and evenly loaded, examine the detailed profile output to + determine which host is taking longer than others for the query phase in question. Examine how many bytes are + processed during that phase on that host, how much memory is used, and how many bytes are transmitted across + the network. + </p> + + <p> + The most common symptom is a higher number of bytes read on one host than others, due to one host being + requested to process a higher number of HDFS data blocks. This condition is more likely to occur when the + number of blocks accessed by the query is relatively small. For example, if you have a 10-node cluster and + the query processes 10 HDFS blocks, each node might not process exactly one block. If one node sits idle + while another node processes two blocks, the query could take twice as long as if the data was perfectly + distributed. + </p> + + <p> + Possible solutions in this case include: + </p> + + <ul> + <li> + <p> + If the query is artificially small, perhaps for benchmarking purposes, scale it up to process a larger + data set. For example, if some nodes read 10 HDFS data blocks while others read 11, the overall effect of + the uneven distribution is much lower than when some nodes did twice as much work as others. As a + guideline, aim for a <q>sweet spot</q> where each node reads 2 GB or more from HDFS per query. Queries + that process lower volumes than that could experience inconsistent performance that smooths out as + queries become more data-intensive. + </p> + </li> + + <li> + <p> + If the query processes only a few large blocks, so that many nodes sit idle and cannot help to + parallelize the query, consider reducing the overall block size. For example, you might adjust the + <codeph>PARQUET_FILE_SIZE</codeph> query option before copying or converting data into a Parquet table. + Or you might adjust the granularity of data files produced earlier in the ETL pipeline by non-Impala + components. In Impala 2.0 and later, the default Parquet block size is 256 MB, reduced from 1 GB, to + improve parallelism for common cluster sizes and data volumes. + </p> + </li> + + <li> + <p> + Reduce the amount of compression applied to the data. For text data files, the highest degree of + compression (gzip) produces unsplittable files that are more difficult for Impala to process in parallel, + and require extra memory during processing to hold the compressed and uncompressed data simultaneously. + For binary formats such as Parquet and Avro, compression can result in fewer data blocks overall, but + remember that when queries process relatively few blocks, there is less opportunity for parallel + execution and many nodes in the cluster might sit idle. Note that when Impala writes Parquet data with + the query option <codeph>COMPRESSION_CODEC=NONE</codeph> enabled, the data is still typically compact due + to the encoding schemes used by Parquet, independent of the final compression step. + </p> + </li> + </ul> + </conbody> +</concept>
