http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be0f122/docs/topics/impala_perf_joins.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_perf_joins.xml 
b/docs/topics/impala_perf_joins.xml
new file mode 100644
index 0000000..4fcd896
--- /dev/null
+++ b/docs/topics/impala_perf_joins.xml
@@ -0,0 +1,506 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd">
+<concept id="perf_joins">
+
+  <title>Performance Considerations for Join Queries</title>
+  <titlealts audience="PDF"><navtitle>Join Performance</navtitle></titlealts>
+  <prolog>
+    <metadata>
+      <data name="Category" value="Impala"/>
+      <data name="Category" value="Performance"/>
+      <data name="Category" value="Querying"/>
+      <data name="Category" value="SQL"/>
+      <data name="Category" value="Developers"/>
+      <data name="Category" value="Data Analysts"/>
+    </metadata>
+  </prolog>
+
+  <conbody>
+
+    <p>
+      Queries involving join operations often require more tuning than queries 
that refer to only one table. The
+      maximum size of the result set from a join query is the product of the 
number of rows in all the joined
+      tables. When joining several tables with millions or billions of rows, 
any missed opportunity to filter the
+      result set, or other inefficiency in the query, could lead to an 
operation that does not finish in a
+      practical time and has to be cancelled.
+    </p>
+
+    <p rev="1.2.2">
+      The simplest technique for tuning an Impala join query is to collect 
statistics on each table involved in the
+      join using the <codeph><xref 
href="impala_compute_stats.xml#compute_stats">COMPUTE STATS</xref></codeph>
+      statement, and then let Impala automatically optimize the query based on 
the size of each table, number of
+      distinct values of each column, and so on. The <codeph>COMPUTE 
STATS</codeph> statement and the join
+      optimization are new features introduced in Impala 1.2.2. For accurate 
statistics about each table, issue the
+      <codeph>COMPUTE STATS</codeph> statement after loading the data into 
that table, and again if the amount of
+      data changes substantially due to an <codeph>INSERT</codeph>, 
<codeph>LOAD DATA</codeph>, adding a partition,
+      and so on.
+    </p>
+
+    <p>
+      If statistics are not available for all the tables in the join query, or 
if Impala chooses a join order that
+      is not the most efficient, you can override the automatic join order 
optimization by specifying the
+      <codeph>STRAIGHT_JOIN</codeph> keyword immediately after the 
<codeph>SELECT</codeph> keyword. In this case,
+      Impala uses the order the tables appear in the query to guide how the 
joins are processed.
+    </p>
+
+    <p>
+      When you use the <codeph>STRAIGHT_JOIN</codeph> technique, you must 
order the tables in the join query
+      manually instead of relying on the Impala optimizer. The optimizer uses 
sophisticated techniques to estimate
+      the size of the result set at each stage of the join. For manual 
ordering, use this heuristic approach to
+      start with, and then experiment to fine-tune the order:
+    </p>
+
+    <ul>
+      <li>
+        Specify the largest table first. This table is read from disk by each 
Impala node and so its size is not
+        significant in terms of memory usage during the query.
+      </li>
+
+      <li>
+        Next, specify the smallest table. The contents of the second, third, 
and so on tables are all transmitted
+        across the network. You want to minimize the size of the result set 
from each subsequent stage of the join
+        query. The most likely approach involves joining a small table first, 
so that the result set remains small
+        even as subsequent larger tables are processed.
+      </li>
+
+      <li>
+        Join the next smallest table, then the next smallest, and so on.
+      </li>
+
+      <li>
+        For example, if you had tables <codeph>BIG</codeph>, 
<codeph>MEDIUM</codeph>, <codeph>SMALL</codeph>, and
+        <codeph>TINY</codeph>, the logical join order to try would be 
<codeph>BIG</codeph>, <codeph>TINY</codeph>,
+        <codeph>SMALL</codeph>, <codeph>MEDIUM</codeph>.
+      </li>
+    </ul>
+
+    <p>
+      The terms <q>largest</q> and <q>smallest</q> refers to the size of the 
intermediate result set based on the
+      number of rows and columns from each table that are part of the result 
set. For example, if you join one
+      table <codeph>sales</codeph> with another table 
<codeph>customers</codeph>, a query might find results from
+      100 different customers who made a total of 5000 purchases. In that 
case, you would specify <codeph>SELECT
+      ... FROM sales JOIN customers ...</codeph>, putting 
<codeph>customers</codeph> on the right side because it
+      is smaller in the context of this query.
+    </p>
+
+    <p>
+      The Impala query planner chooses between different techniques for 
performing join queries, depending on the
+      absolute and relative sizes of the tables. <b>Broadcast joins</b> are 
the default, where the right-hand table
+      is considered to be smaller than the left-hand table, and its contents 
are sent to all the other nodes
+      involved in the query. The alternative technique is known as a 
<b>partitioned join</b> (not related to a
+      partitioned table), which is more suitable for large tables of roughly 
equal size. With this technique,
+      portions of each table are sent to appropriate other nodes where those 
subsets of rows can be processed in
+      parallel. The choice of broadcast or partitioned join also depends on 
statistics being available for all
+      tables in the join, gathered by the <codeph>COMPUTE STATS</codeph> 
statement.
+    </p>
+
+    <p>
+      To see which join strategy is used for a particular query, issue an 
<codeph>EXPLAIN</codeph> statement for
+      the query. If you find that a query uses a broadcast join when you know 
through benchmarking that a
+      partitioned join would be more efficient, or vice versa, add a hint to 
the query to specify the precise join
+      mechanism to use. See <xref href="impala_hints.xml#hints"/> for details.
+    </p>
+  </conbody>
+
+  <concept rev="1.2.2" id="joins_no_stats">
+
+    <title>How Joins Are Processed when Statistics Are Unavailable</title>
+  <prolog>
+    <metadata>
+      <data name="Category" value="Concepts"/>
+    </metadata>
+  </prolog>
+
+    <conbody>
+
+      <p>
+        If table or column statistics are not available for some tables in a 
join, Impala still reorders the tables
+        using the information that is available. Tables with statistics are 
placed on the left side of the join
+        order, in descending order of cost based on overall size and 
cardinality. Tables without statistics are
+        treated as zero-size, that is, they are always placed on the right 
side of the join order.
+      </p>
+    </conbody>
+  </concept>
+
+  <concept rev="1.2.2" id="straight_join">
+
+    <title>Overriding Join Reordering with STRAIGHT_JOIN</title>
+
+    <conbody>
+
+      <p>
+        If an Impala join query is inefficient because of outdated statistics 
or unexpected data distribution, you
+        can keep Impala from reordering the joined tables by using the 
<codeph>STRAIGHT_JOIN</codeph> keyword
+        immediately after the <codeph>SELECT</codeph> keyword. The 
<codeph>STRAIGHT_JOIN</codeph> keyword turns off
+        the reordering of join clauses that Impala does internally, and 
produces a plan that relies on the join
+        clauses being ordered optimally in the query text. In this case, 
rewrite the query so that the largest
+        table is on the left, followed by the next largest, and so on until 
the smallest table is on the right.
+      </p>
+
+      <p>
+        In this example, the subselect from the <codeph>BIG</codeph> table 
produces a very small result set, but
+        the table might still be treated as if it were the biggest and placed 
first in the join order. Using
+        <codeph>STRAIGHT_JOIN</codeph> for the last join clause prevents the 
final table from being reordered,
+        keeping it as the rightmost table in the join order.
+      </p>
+
+<codeblock>select straight_join x from medium join small join (select * from 
big where c1 &lt; 10) as big
+  where medium.id = small.id and small.id = big.id;</codeblock>
+    </conbody>
+  </concept>
+
+  <concept id="perf_joins_examples">
+
+    <title>Examples of Join Order Optimization</title>
+
+    <conbody>
+
+      <p>
+        Here are examples showing joins between tables with 1 billion, 200 
million, and 1 million rows. (In this
+        case, the tables are unpartitioned and using Parquet format.) The 
smaller tables contain subsets of data
+        from the largest one, for convenience of joining on the unique 
<codeph>ID</codeph> column. The smallest
+        table only contains a subset of columns from the others.
+      </p>
+
+      <p></p>
+
+<codeblock>[localhost:21000] &gt; create table big stored as parquet as select 
* from raw_data;
++----------------------------+
+| summary                    |
++----------------------------+
+| Inserted 1000000000 row(s) |
++----------------------------+
+Returned 1 row(s) in 671.56s
+[localhost:21000] &gt; desc big;
++-----------+---------+---------+
+| name      | type    | comment |
++-----------+---------+---------+
+| id        | int     |         |
+| val       | int     |         |
+| zfill     | string  |         |
+| name      | string  |         |
+| assertion | boolean |         |
++-----------+---------+---------+
+Returned 5 row(s) in 0.01s
+[localhost:21000] &gt; create table medium stored as parquet as select * from 
big limit 200 * floor(1e6);
++---------------------------+
+| summary                   |
++---------------------------+
+| Inserted 200000000 row(s) |
++---------------------------+
+Returned 1 row(s) in 138.31s
+[localhost:21000] &gt; create table small stored as parquet as select 
id,val,name from big where assertion = true limit 1 * floor(1e6);
++-------------------------+
+| summary                 |
++-------------------------+
+| Inserted 1000000 row(s) |
++-------------------------+
+Returned 1 row(s) in 6.32s</codeblock>
+
+      <p>
+        For any kind of performance experimentation, use the 
<codeph>EXPLAIN</codeph> statement to see how any
+        expensive query will be performed without actually running it, and 
enable verbose <codeph>EXPLAIN</codeph>
+        plans containing more performance-oriented detail: The most 
interesting plan lines are highlighted in bold,
+        showing that without statistics for the joined tables, Impala cannot 
make a good estimate of the number of
+        rows involved at each stage of processing, and is likely to stick with 
the <codeph>BROADCAST</codeph> join
+        mechanism that sends a complete copy of one of the tables to each node.
+      </p>
+
+<codeblock>[localhost:21000] &gt; set explain_level=verbose;
+EXPLAIN_LEVEL set to verbose
+[localhost:21000] &gt; explain select count(*) from big join medium where 
big.id = medium.id;
++----------------------------------------------------------+
+| Explain String                                           |
++----------------------------------------------------------+
+| Estimated Per-Host Requirements: Memory=2.10GB VCores=2  |
+|                                                          |
+| PLAN FRAGMENT 0                                          |
+|   PARTITION: UNPARTITIONED                               |
+|                                                          |
+|   6:AGGREGATE (merge finalize)                           |
+|   |  output: SUM(COUNT(*))                               |
+|   |  cardinality: 1                                      |
+|   |  per-host memory: unavailable                        |
+|   |  tuple ids: 2                                        |
+|   |                                                      |
+|   5:EXCHANGE                                             |
+|      cardinality: 1                                      |
+|      per-host memory: unavailable                        |
+|      tuple ids: 2                                        |
+|                                                          |
+| PLAN FRAGMENT 1                                          |
+|   PARTITION: RANDOM                                      |
+|                                                          |
+|   STREAM DATA SINK                                       |
+|     EXCHANGE ID: 5                                       |
+|     UNPARTITIONED                                        |
+|                                                          |
+|   3:AGGREGATE                                            |
+|   |  output: COUNT(*)                                    |
+|   |  cardinality: 1                                      |
+|   |  per-host memory: 10.00MB                            |
+|   |  tuple ids: 2                                        |
+|   |                                                      |
+|   2:HASH JOIN                                            |
+<b>|   |  join op: INNER JOIN (BROADCAST)                     |</b>
+|   |  hash predicates:                                    |
+|   |    big.id = medium.id                                |
+<b>|   |  cardinality: unavailable                            |</b>
+|   |  per-host memory: 2.00GB                             |
+|   |  tuple ids: 0 1                                      |
+|   |                                                      |
+|   |----4:EXCHANGE                                        |
+|   |       cardinality: unavailable                       |
+|   |       per-host memory: 0B                            |
+|   |       tuple ids: 1                                   |
+|   |                                                      |
+|   0:SCAN HDFS                                            |
+<b>|      table=join_order.big #partitions=1/1 size=23.12GB   |
+|      table stats: unavailable                            |
+|      column stats: unavailable                           |
+|      cardinality: unavailable                            |</b>
+|      per-host memory: 88.00MB                            |
+|      tuple ids: 0                                        |
+|                                                          |
+| PLAN FRAGMENT 2                                          |
+|   PARTITION: RANDOM                                      |
+|                                                          |
+|   STREAM DATA SINK                                       |
+|     EXCHANGE ID: 4                                       |
+|     UNPARTITIONED                                        |
+|                                                          |
+|   1:SCAN HDFS                                            |
+<b>|      table=join_order.medium #partitions=1/1 size=4.62GB |
+|      table stats: unavailable                            |
+|      column stats: unavailable                           |
+|      cardinality: unavailable                            |</b>
+|      per-host memory: 88.00MB                            |
+|      tuple ids: 1                                        |
++----------------------------------------------------------+
+Returned 64 row(s) in 0.04s</codeblock>
+
+      <p>
+        Gathering statistics for all the tables is straightforward, one 
<codeph>COMPUTE STATS</codeph> statement
+        per table:
+      </p>
+
+<codeblock>[localhost:21000] &gt; compute stats small;
++-----------------------------------------+
+| summary                                 |
++-----------------------------------------+
+| Updated 1 partition(s) and 3 column(s). |
++-----------------------------------------+
+Returned 1 row(s) in 4.26s
+[localhost:21000] &gt; compute stats medium;
++-----------------------------------------+
+| summary                                 |
++-----------------------------------------+
+| Updated 1 partition(s) and 5 column(s). |
++-----------------------------------------+
+Returned 1 row(s) in 42.11s
+[localhost:21000] &gt; compute stats big;
++-----------------------------------------+
+| summary                                 |
++-----------------------------------------+
+| Updated 1 partition(s) and 5 column(s). |
++-----------------------------------------+
+Returned 1 row(s) in 165.44s</codeblock>
+
+      <p>
+        With statistics in place, Impala can choose a more effective join 
order rather than following the
+        left-to-right sequence of tables in the query, and can choose 
<codeph>BROADCAST</codeph> or
+        <codeph>PARTITIONED</codeph> join strategies based on the overall 
sizes and number of rows in the table:
+      </p>
+
+<codeblock>[localhost:21000] &gt; explain select count(*) from medium join big 
where big.id = medium.id;
+Query: explain select count(*) from medium join big where big.id = medium.id
++-----------------------------------------------------------+
+| Explain String                                            |
++-----------------------------------------------------------+
+| Estimated Per-Host Requirements: Memory=937.23MB VCores=2 |
+|                                                           |
+| PLAN FRAGMENT 0                                           |
+|   PARTITION: UNPARTITIONED                                |
+|                                                           |
+|   6:AGGREGATE (merge finalize)                            |
+|   |  output: SUM(COUNT(*))                                |
+|   |  cardinality: 1                                       |
+|   |  per-host memory: unavailable                         |
+|   |  tuple ids: 2                                         |
+|   |                                                       |
+|   5:EXCHANGE                                              |
+|      cardinality: 1                                       |
+|      per-host memory: unavailable                         |
+|      tuple ids: 2                                         |
+|                                                           |
+| PLAN FRAGMENT 1                                           |
+|   PARTITION: RANDOM                                       |
+|                                                           |
+|   STREAM DATA SINK                                        |
+|     EXCHANGE ID: 5                                        |
+|     UNPARTITIONED                                         |
+|                                                           |
+|   3:AGGREGATE                                             |
+|   |  output: COUNT(*)                                     |
+|   |  cardinality: 1                                       |
+|   |  per-host memory: 10.00MB                             |
+|   |  tuple ids: 2                                         |
+|   |                                                       |
+|   2:HASH JOIN                                             |
+|   |  join op: INNER JOIN (BROADCAST)                      |
+|   |  hash predicates:                                     |
+|   |    big.id = medium.id                                 |
+|   |  cardinality: 1443004441                              |
+|   |  per-host memory: 839.23MB                            |
+|   |  tuple ids: 1 0                                       |
+|   |                                                       |
+|   |----4:EXCHANGE                                         |
+|   |       cardinality: 200000000                          |
+|   |       per-host memory: 0B                             |
+|   |       tuple ids: 0                                    |
+|   |                                                       |
+|   1:SCAN HDFS                                             |
+|      table=join_order.big #partitions=1/1 size=23.12GB    |
+|      table stats: 1000000000 rows total                   |
+|      column stats: all                                    |
+|      cardinality: 1000000000                              |
+|      per-host memory: 88.00MB                             |
+|      tuple ids: 1                                         |
+|                                                           |
+| PLAN FRAGMENT 2                                           |
+|   PARTITION: RANDOM                                       |
+|                                                           |
+|   STREAM DATA SINK                                        |
+|     EXCHANGE ID: 4                                        |
+|     UNPARTITIONED                                         |
+|                                                           |
+|   0:SCAN HDFS                                             |
+|      table=join_order.medium #partitions=1/1 size=4.62GB  |
+|      table stats: 200000000 rows total                    |
+|      column stats: all                                    |
+|      cardinality: 200000000                               |
+|      per-host memory: 88.00MB                             |
+|      tuple ids: 0                                         |
++-----------------------------------------------------------+
+Returned 64 row(s) in 0.04s
+
+[localhost:21000] &gt; explain select count(*) from small join big where 
big.id = small.id;
+Query: explain select count(*) from small join big where big.id = small.id
++-----------------------------------------------------------+
+| Explain String                                            |
++-----------------------------------------------------------+
+| Estimated Per-Host Requirements: Memory=101.15MB VCores=2 |
+|                                                           |
+| PLAN FRAGMENT 0                                           |
+|   PARTITION: UNPARTITIONED                                |
+|                                                           |
+|   6:AGGREGATE (merge finalize)                            |
+|   |  output: SUM(COUNT(*))                                |
+|   |  cardinality: 1                                       |
+|   |  per-host memory: unavailable                         |
+|   |  tuple ids: 2                                         |
+|   |                                                       |
+|   5:EXCHANGE                                              |
+|      cardinality: 1                                       |
+|      per-host memory: unavailable                         |
+|      tuple ids: 2                                         |
+|                                                           |
+| PLAN FRAGMENT 1                                           |
+|   PARTITION: RANDOM                                       |
+|                                                           |
+|   STREAM DATA SINK                                        |
+|     EXCHANGE ID: 5                                        |
+|     UNPARTITIONED                                         |
+|                                                           |
+|   3:AGGREGATE                                             |
+|   |  output: COUNT(*)                                     |
+|   |  cardinality: 1                                       |
+|   |  per-host memory: 10.00MB                             |
+|   |  tuple ids: 2                                         |
+|   |                                                       |
+|   2:HASH JOIN                                             |
+|   |  join op: INNER JOIN (BROADCAST)                      |
+|   |  hash predicates:                                     |
+|   |    big.id = small.id                                  |
+|   |  cardinality: 1000000000                              |
+|   |  per-host memory: 3.15MB                              |
+|   |  tuple ids: 1 0                                       |
+|   |                                                       |
+|   |----4:EXCHANGE                                         |
+|   |       cardinality: 1000000                            |
+|   |       per-host memory: 0B                             |
+|   |       tuple ids: 0                                    |
+|   |                                                       |
+|   1:SCAN HDFS                                             |
+|      table=join_order.big #partitions=1/1 size=23.12GB    |
+|      table stats: 1000000000 rows total                   |
+|      column stats: all                                    |
+|      cardinality: 1000000000                              |
+|      per-host memory: 88.00MB                             |
+|      tuple ids: 1                                         |
+|                                                           |
+| PLAN FRAGMENT 2                                           |
+|   PARTITION: RANDOM                                       |
+|                                                           |
+|   STREAM DATA SINK                                        |
+|     EXCHANGE ID: 4                                        |
+|     UNPARTITIONED                                         |
+|                                                           |
+|   0:SCAN HDFS                                             |
+|      table=join_order.small #partitions=1/1 size=17.93MB  |
+|      table stats: 1000000 rows total                      |
+|      column stats: all                                    |
+|      cardinality: 1000000                                 |
+|      per-host memory: 32.00MB                             |
+|      tuple ids: 0                                         |
++-----------------------------------------------------------+
+Returned 64 row(s) in 0.03s</codeblock>
+
+      <p>
+        When queries like these are actually run, the execution times are 
relatively consistent regardless of the
+        table order in the query text. Here are examples using both the unique 
<codeph>ID</codeph> column and the
+        <codeph>VAL</codeph> column containing duplicate values:
+      </p>
+
+<codeblock>[localhost:21000] &gt; select count(*) from big join small on 
(big.id = small.id);
+Query: select count(*) from big join small on (big.id = small.id)
++----------+
+| count(*) |
++----------+
+| 1000000  |
++----------+
+Returned 1 row(s) in 21.68s
+[localhost:21000] &gt; select count(*) from small join big on (big.id = 
small.id);
+Query: select count(*) from small join big on (big.id = small.id)
++----------+
+| count(*) |
++----------+
+| 1000000  |
++----------+
+Returned 1 row(s) in 20.45s
+
+[localhost:21000] &gt; select count(*) from big join small on (big.val = 
small.val);
++------------+
+| count(*)   |
++------------+
+| 2000948962 |
++------------+
+Returned 1 row(s) in 108.85s
+[localhost:21000] &gt; select count(*) from small join big on (big.val = 
small.val);
++------------+
+| count(*)   |
++------------+
+| 2000948962 |
++------------+
+Returned 1 row(s) in 100.76s</codeblock>
+
+      <note>
+        When examining the performance of join queries and the effectiveness 
of the join order optimization, make
+        sure the query involves enough data and cluster resources to see a 
difference depending on the query plan.
+        For example, a single data file of just a few megabytes will reside in 
a single HDFS block and be processed
+        on a single node. Likewise, if you use a single-node or two-node 
cluster, there might not be much
+        difference in efficiency for the broadcast or partitioned join 
strategies.
+      </note>
+    </conbody>
+  </concept>
+</concept>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be0f122/docs/topics/impala_perf_resources.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_perf_resources.xml 
b/docs/topics/impala_perf_resources.xml
new file mode 100644
index 0000000..c538893
--- /dev/null
+++ b/docs/topics/impala_perf_resources.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd">
+<concept id="mem_limits">
+
+  <title>Controlling Impala Resource Usage</title>
+  <titlealts audience="PDF"><navtitle>Controlling Resource 
Usage</navtitle></titlealts>
+  <prolog>
+    <metadata>
+      <data name="Category" value="Impala"/>
+      <data name="Category" value="Performance"/>
+      <data name="Category" value="Memory"/>
+      <data name="Category" value="Scalability"/>
+      <data name="Category" value="Resource Management"/>
+      <data name="Category" value="Administrators"/>
+      <data name="Category" value="Developers"/>
+      <data name="Category" value="Data Analysts"/>
+    </metadata>
+  </prolog>
+
+  <conbody>
+
+    <p>
+      Sometimes, balancing raw query performance against scalability requires 
limiting the amount of resources,
+      such as memory or CPU, used by a single query or group of queries. 
Impala can use several mechanisms that
+      help to smooth out the load during heavy concurrent usage, resulting in 
faster overall query times and
+      sharing of resources across Impala queries, MapReduce jobs, and other 
kinds of workloads across a CDH
+      cluster:
+    </p>
+
+    <ul>
+      <li>
+        The Impala admission control feature uses a fast, distributed 
mechanism to hold back queries that exceed
+        limits on the number of concurrent queries or the amount of memory 
used. The queries are queued, and
+        executed as other queries finish and resources become available. You 
can control the concurrency limits,
+        and specify different limits for different groups of users to divide 
cluster resources according to the
+        priorities of different classes of users. This feature is new in 
Impala 1.3, and works with both CDH 4 and
+        CDH 5. See <xref href="impala_admission.xml#admission_control"/> for 
details.
+      </li>
+
+      <li>
+        <p>
+          You can restrict the amount of memory Impala reserves during query 
execution by specifying the
+          <codeph>-mem_limit</codeph> option for the <codeph>impalad</codeph> 
daemon. See
+          <xref href="impala_config_options.xml#config_options"/> for details. 
This limit applies only to the
+          memory that is directly consumed by queries; Impala reserves 
additional memory at startup, for example to
+          hold cached metadata.
+        </p>
+      </li>
+
+      <li>
+        <p>
+          For production deployment, <ph rev="upstream">Cloudera</ph> 
recommends that you implement resource isolation using mechanisms
+          such as cgroups, which you can configure using Cloudera Manager. For 
details, see the
+          <xref 
href="http://www.cloudera.com/documentation/enterprise/latest/topics/cm_mc_service_pools.html";
 scope="external" format="html">Static
+          Resource Pools</xref> in the Cloudera Manager documentation.
+        </p>
+      </li>
+    </ul>
+  </conbody>
+</concept>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be0f122/docs/topics/impala_perf_skew.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_perf_skew.xml b/docs/topics/impala_perf_skew.xml
new file mode 100644
index 0000000..b3a7cec
--- /dev/null
+++ b/docs/topics/impala_perf_skew.xml
@@ -0,0 +1,150 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd">
+<concept id="perf_skew">
+
+  <title>Detecting and Correcting HDFS Block Skew Conditions</title>
+  <titlealts audience="PDF"><navtitle>HDFS Block Skew</navtitle></titlealts>
+  <prolog>
+    <metadata>
+      <data name="Category" value="Impala"/>
+      <data name="Category" value="Performance"/>
+      <data name="Category" value="HDFS"/>
+      <data name="Category" value="Proof of Concept"/>
+      <data name="Category" value="Administrators"/>
+      <data name="Category" value="Developers"/>
+      <data name="Category" value="Data Analysts"/>
+    </metadata>
+  </prolog>
+
+  <conbody>
+
+    <p>
+      For best performance of Impala parallel queries, the work is divided 
equally across hosts in the cluster, and
+      all hosts take approximately equal time to finish their work. If one 
host takes substantially longer than
+      others, the extra time needed for the slow host can become the dominant 
factor in query performance.
+      Therefore, one of the first steps in performance tuning for Impala is to 
detect and correct such conditions.
+    </p>
+
+    <p>
+      The main cause of uneven performance that you can correct within Impala 
is <term>skew</term> in the number of
+      HDFS data blocks processed by each host, where some hosts process 
substantially more data blocks than others.
+      This condition can occur because of uneven distribution of the data 
values themselves, for example causing
+      certain data files or partitions to be large while others are very 
small. (Although it is possible to have
+      unevenly distributed data without any problems with the distribution of 
HDFS blocks.) Block skew could also
+      be due to the underlying block allocation policies within HDFS, the 
replication factor of the data files, and
+      the way that Impala chooses the host to process each data block.
+    </p>
+
+    <p>
+      The most convenient way to detect block skew, or slow-host issues in 
general, is to examine the <q>executive
+      summary</q> information from the query profile after running a query:
+    </p>
+
+    <ul>
+      <li>
+        <p>
+          In <cmdname>impala-shell</cmdname>, issue the 
<codeph>SUMMARY</codeph> command immediately after the
+          query is complete, to see just the summary information. If you 
detect issues involving skew, you might
+          switch to issuing the <codeph>PROFILE</codeph> command, which 
displays the summary information followed
+          by a detailed performance analysis.
+        </p>
+      </li>
+
+      <li>
+        <p>
+          In the Cloudera Manager interface or the Impala debug web UI, click 
on the <uicontrol>Profile</uicontrol>
+          link associated with the query after it is complete. The executive 
summary information is displayed early
+          in the profile output.
+        </p>
+      </li>
+    </ul>
+
+    <p>
+      For each phase of the query, you see an <uicontrol>Avg Time</uicontrol> 
and a <uicontrol>Max Time</uicontrol>
+      value, along with <uicontrol>#Hosts</uicontrol> indicating how many 
hosts are involved in that query phase.
+      For all the phases with <uicontrol>#Hosts</uicontrol> greater than one, 
look for cases where the maximum time
+      is substantially greater than the average time. Focus on the phases that 
took the longest, for example, those
+      taking multiple seconds rather than milliseconds or microseconds.
+    </p>
+
+    <p>
+      If you detect that some hosts take longer than others, first rule out 
non-Impala causes. One reason that some
+      hosts could be slower than others is if those hosts have less capacity 
than the others, or if they are
+      substantially busier due to unevenly distributed non-Impala workloads:
+    </p>
+
+    <ul>
+      <li>
+        <p>
+          For clusters running Impala, keep the relative capacities of all 
hosts roughly equal. Any cost savings
+          from including some underpowered hosts in the cluster will likely be 
outweighed by poor or uneven
+          performance, and the time spent diagnosing performance issues.
+        </p>
+      </li>
+
+      <li>
+        <p>
+          If non-Impala workloads cause slowdowns on some hosts but not 
others, use the appropriate load-balancing
+          techniques for the non-Impala components to smooth out the load 
across the cluster.
+        </p>
+      </li>
+    </ul>
+
+    <p>
+      If the hosts on your cluster are evenly powered and evenly loaded, 
examine the detailed profile output to
+      determine which host is taking longer than others for the query phase in 
question. Examine how many bytes are
+      processed during that phase on that host, how much memory is used, and 
how many bytes are transmitted across
+      the network.
+    </p>
+
+    <p>
+      The most common symptom is a higher number of bytes read on one host 
than others, due to one host being
+      requested to process a higher number of HDFS data blocks. This condition 
is more likely to occur when the
+      number of blocks accessed by the query is relatively small. For example, 
if you have a 10-node cluster and
+      the query processes 10 HDFS blocks, each node might not process exactly 
one block. If one node sits idle
+      while another node processes two blocks, the query could take twice as 
long as if the data was perfectly
+      distributed.
+    </p>
+
+    <p>
+      Possible solutions in this case include:
+    </p>
+
+    <ul>
+      <li>
+        <p>
+          If the query is artificially small, perhaps for benchmarking 
purposes, scale it up to process a larger
+          data set. For example, if some nodes read 10 HDFS data blocks while 
others read 11, the overall effect of
+          the uneven distribution is much lower than when some nodes did twice 
as much work as others. As a
+          guideline, aim for a <q>sweet spot</q> where each node reads 2 GB or 
more from HDFS per query. Queries
+          that process lower volumes than that could experience inconsistent 
performance that smooths out as
+          queries become more data-intensive.
+        </p>
+      </li>
+
+      <li>
+        <p>
+          If the query processes only a few large blocks, so that many nodes 
sit idle and cannot help to
+          parallelize the query, consider reducing the overall block size. For 
example, you might adjust the
+          <codeph>PARQUET_FILE_SIZE</codeph> query option before copying or 
converting data into a Parquet table.
+          Or you might adjust the granularity of data files produced earlier 
in the ETL pipeline by non-Impala
+          components. In Impala 2.0 and later, the default Parquet block size 
is 256 MB, reduced from 1 GB, to
+          improve parallelism for common cluster sizes and data volumes.
+        </p>
+      </li>
+
+      <li>
+        <p>
+          Reduce the amount of compression applied to the data. For text data 
files, the highest degree of
+          compression (gzip) produces unsplittable files that are more 
difficult for Impala to process in parallel,
+          and require extra memory during processing to hold the compressed 
and uncompressed data simultaneously.
+          For binary formats such as Parquet and Avro, compression can result 
in fewer data blocks overall, but
+          remember that when queries process relatively few blocks, there is 
less opportunity for parallel
+          execution and many nodes in the cluster might sit idle. Note that 
when Impala writes Parquet data with
+          the query option <codeph>COMPRESSION_CODEC=NONE</codeph> enabled, 
the data is still typically compact due
+          to the encoding schemes used by Parquet, independent of the final 
compression step.
+        </p>
+      </li>
+    </ul>
+  </conbody>
+</concept>


Reply via email to