http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3c2c8f12/docs/topics/impala_perf_hdfs_caching.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_perf_hdfs_caching.xml 
b/docs/topics/impala_perf_hdfs_caching.xml
index b00b804..e7ffeec 100644
--- a/docs/topics/impala_perf_hdfs_caching.xml
+++ b/docs/topics/impala_perf_hdfs_caching.xml
@@ -4,19 +4,606 @@
 
   <title>Using HDFS Caching with Impala (CDH 5.1 or higher only)</title>
   <titlealts audience="PDF"><navtitle>HDFS Caching</navtitle></titlealts>
-  
+  <prolog>
+    <metadata>
+      <data name="Category" value="Impala"/>
+      <data name="Category" value="Performance"/>
+      <data name="Category" value="Scalability"/>
+      <data name="Category" value="HDFS"/>
+      <data name="Category" value="HDFS Caching"/>
+      <data name="Category" value="Memory"/>
+      <data name="Category" value="Administrators"/>
+      <data name="Category" value="Developers"/>
+      <data name="Category" value="Data Analysts"/>
+    </metadata>
+  </prolog>
 
   <conbody>
 
     <p>
       HDFS caching provides performance and scalability benefits in production 
environments where Impala queries
-      and other Hadoop jobs operate on quantities of data much larger than the 
physical RAM on the data nodes,
+      and other Hadoop jobs operate on quantities of data much larger than the 
physical RAM on the DataNodes,
       making it impractical to rely on the Linux OS cache, which only keeps 
the most recently used data in memory.
       Data read from the HDFS cache avoids the overhead of checksumming and 
memory-to-memory copying involved when
       using data from the Linux OS cache.
     </p>
 
-    
+    <note>
+      <p>
+        On a small or lightly loaded cluster, HDFS caching might not produce 
any speedup. It might even lead to
+        slower queries, if I/O read operations that were performed in parallel 
across the entire cluster are replaced by in-memory
+        operations operating on a smaller number of hosts. The hosts where the 
HDFS blocks are cached can become
+        bottlenecks because they experience high CPU load while processing the 
cached data blocks, while other hosts remain idle.
+        Therefore, always compare performance with and without this feature 
enabled, using a realistic workload.
+      </p>
+      <p rev="2.2.0">
+        In CDH 5.4 / Impala 2.2 and higher, you can spread the CPU load more 
evenly by specifying the <codeph>WITH REPLICATION</codeph>
+        clause of the <codeph>CREATE TABLE</codeph> and <codeph>ALTER 
TABLE</codeph> statements.
+        This clause lets you control the replication factor for
+        HDFS caching for a specific table or partition. By default, each 
cached block is
+        only present on a single host, which can lead to CPU contention if the 
same host
+        processes each cached block. Increasing the replication factor lets 
Impala choose
+        different hosts to process different cached blocks, to better 
distribute the CPU load.
+        Always use a <codeph>WITH REPLICATION</codeph> setting of at least 3, 
and adjust upward
+        if necessary to match the replication factor for the underlying HDFS 
data files.
+      </p>
+      <p rev="2.5.0">
+        In CDH 5.7 / Impala 2.5 and higher, Impala automatically randomizes 
which host processes
+        a cached HDFS block, to avoid CPU hotspots. For tables where HDFS 
caching is not applied,
+        Impala designates which host to process a data block using an 
algorithm that estimates
+        the load on each host. If CPU hotspots still arise during queries,
+        you can enable additional randomization for the scheduling algorithm 
for non-HDFS cached data
+        by setting the <codeph>SCHEDULE_RANDOM_REPLICA</codeph> query option.
+      </p>
+    </note>
+
+    <p outputclass="toc inpage"/>
+
+<!-- Could conref this background link; haven't decided yet the best place or 
if it's needed twice. -->
+
+    <p>
+      For background information about how to set up and manage HDFS caching 
for a CDH cluster, see
+<!-- Original URL: 
http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Installation-Guide/cdh_ig_hdfs_caching.html
 -->
+      <xref 
href="http://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_hdfs_caching.html";
 scope="external" format="html">the
+      CDH documentation</xref>.
+    </p>
+  </conbody>
+
+  <concept id="hdfs_caching_overview">
+
+    <title>Overview of HDFS Caching for Impala</title>
+  <prolog>
+    <metadata>
+      <data name="Category" value="Concepts"/>
+    </metadata>
+  </prolog>
+
+    <conbody>
+
+      <p>
+        On CDH 5.1 and higher, Impala can use the HDFS caching feature to make 
more effective use of RAM, so that
+        repeated queries can take advantage of data <q>pinned</q> in memory 
regardless of how much data is
+        processed overall. The HDFS caching feature lets you designate a 
subset of frequently accessed data to be
+        pinned permanently in memory, remaining in the cache across multiple 
queries and never being evicted. This
+        technique is suitable for tables or partitions that are frequently 
accessed and are small enough to fit
+        entirely within the HDFS memory cache. For example, you might 
designate several dimension tables to be
+        pinned in the cache, to speed up many different join queries that 
reference them. Or in a partitioned
+        table, you might pin a partition holding data from the most recent 
time period because that data will be
+        queried intensively; then when the next set of data arrives, you could 
unpin the previous partition and pin
+        the partition holding the new data.
+      </p>
+
+      <p>
+        Because this Impala performance feature relies on HDFS infrastructure, 
it only applies to Impala tables
+        that use HDFS data files. HDFS caching for Impala does not apply to 
HBase tables, S3 tables,
+        Kudu tables,
+        or Isilon tables.
+      </p>
+
+    </conbody>
+  </concept>
+
+  <concept id="hdfs_caching_prereqs">
+
+    <title>Setting Up HDFS Caching for Impala</title>
+
+    <conbody>
+
+      <p>
+        To use HDFS caching with Impala, first set up that feature for your 
CDH cluster:
+      </p>
+
+      <ul>
+        <li>
+          <p>
+          Decide how much memory to devote to the HDFS cache on each host. 
Remember that the total memory available
+          for cached data is the sum of the cache sizes on all the hosts. By 
default, any data block is only cached on one
+          host, although you can cache a block across multiple hosts by 
increasing the replication factor.
+          <!-- Obsoleted in Impala 2.2 and higher by IMPALA-1587.
+          Once a data block is cached on one host, all requests to process 
that block are routed to that same host.)
+          -->
+          </p>
+        </li>
+
+        <li>
+          <p>
+          Issue <cmdname>hdfs cacheadmin</cmdname> commands to set up one or 
more cache pools, owned by the same
+          user as the <cmdname>impalad</cmdname> daemon (typically 
<codeph>impala</codeph>). For example:
+<codeblock>hdfs cacheadmin -addPool four_gig_pool -owner impala -limit 
4000000000
+</codeblock>
+          For details about the <cmdname>hdfs cacheadmin</cmdname> command, see
+<!-- Original URL: 
http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Installation-Guide/cdh_ig_hdfs_caching.html
 -->
+          <xref 
href="http://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_hdfs_caching.html";
 scope="external" format="html">the
+          CDH documentation</xref>.
+          </p>
+        </li>
+      </ul>
+
+      <p>
+        Once HDFS caching is enabled and one or more pools are available, see
+        <xref href="impala_perf_hdfs_caching.xml#hdfs_caching_ddl"/> for how 
to choose which Impala data to load
+        into the HDFS cache. On the Impala side, you specify the cache pool 
name defined by the <codeph>hdfs
+        cacheadmin</codeph> command in the Impala DDL statements that enable 
HDFS caching for a table or partition,
+        such as <codeph>CREATE TABLE ... CACHED IN 
<varname>pool</varname></codeph> or <codeph>ALTER TABLE ... SET
+        CACHED IN <varname>pool</varname></codeph>.
+      </p>
+    </conbody>
+  </concept>
+
+  <concept id="hdfs_caching_ddl">
+
+    <title>Enabling HDFS Caching for Impala Tables and Partitions</title>
+
+    <conbody>
+
+      <p>
+        Begin by choosing which tables or partitions to cache. For example, 
these might be lookup tables that are
+        accessed by many different join queries, or partitions corresponding 
to the most recent time period that
+        are analyzed by different reports or ad hoc queries.
+      </p>
+
+      <p>
+        In your SQL statements, you specify logical divisions such as tables 
and partitions to be cached. Impala
+        translates these requests into HDFS-level directives that apply to 
particular directories and files. For
+        example, given a partitioned table <codeph>CENSUS</codeph> with a 
partition key column
+        <codeph>YEAR</codeph>, you could choose to cache all or part of the 
data as follows:
+      </p>
+
+      <p 
conref="../shared/impala_common.xml#common/impala_cache_replication_factor"/>
+
+<codeblock>-- Cache the entire table (all partitions).
+alter table census set cached in '<varname>pool_name</varname>';
+
+-- Remove the entire table from the cache.
+alter table census set uncached;
+
+-- Cache a portion of the table (a single partition).
+-- If the table is partitioned by multiple columns (such as year, month, day),
+-- the ALTER TABLE command must specify values for all those columns.
+alter table census partition (year=1960) set cached in 
'<varname>pool_name</varname>';
+
+<ph rev="2.2.0">-- Cache the data from one partition on up to 4 hosts, to 
minimize CPU load on any
+-- single host when the same data block is processed multiple times.
+alter table census partition (year=1970)
+  set cached in '<varname>pool_name</varname>' with replication = 4;</ph>
+
+-- At each stage, check the volume of cached data.
+-- For large tables or partitions, the background loading might take some time,
+-- so you might have to wait and reissue the statement until all the data
+-- has finished being loaded into the cache.
+show table stats census;
++-------+-------+--------+------+--------------+--------+
+| year  | #Rows | #Files | Size | Bytes Cached | Format |
++-------+-------+--------+------+--------------+--------+
+| 1900  | -1    | 1      | 11B  | NOT CACHED   | TEXT   |
+| 1940  | -1    | 1      | 11B  | NOT CACHED   | TEXT   |
+| 1960  | -1    | 1      | 11B  | 11B          | TEXT   |
+| 1970  | -1    | 1      | 11B  | NOT CACHED   | TEXT   |
+| Total | -1    | 4      | 44B  | 11B          |        |
++-------+-------+--------+------+--------------+--------+
+</codeblock>
+
+      <p>
+        <b>CREATE TABLE considerations:</b>
+      </p>
+
+      <p>
+        The HDFS caching feature affects the Impala <codeph>CREATE 
TABLE</codeph> statement as follows:
+      </p>
+
+      <ul>
+        <li>
+        <p>
+          You can put a <codeph>CACHED IN 
'<varname>pool_name</varname>'</codeph> clause
+          <ph rev="2.2.0">and optionally a <codeph>WITH REPLICATION = 
<varname>number_of_hosts</varname></codeph> clause</ph>
+          at the end of a
+          <codeph>CREATE TABLE</codeph> statement to automatically cache the 
entire contents of the table,
+          including any partitions added later. The 
<varname>pool_name</varname> is a pool that you previously set
+          up with the <cmdname>hdfs cacheadmin</cmdname> command.
+        </p>
+        </li>
+
+        <li>
+        <p>
+          Once a table is designated for HDFS caching through the 
<codeph>CREATE TABLE</codeph> statement, if new
+          partitions are added later through <codeph>ALTER TABLE ... ADD 
PARTITION</codeph> statements, the data in
+          those new partitions is automatically cached in the same pool.
+        </p>
+        </li>
+
+        <li>
+        <p>
+          If you want to perform repetitive queries on a subset of data from a 
large table, and it is not practical
+          to designate the entire table or specific partitions for HDFS 
caching, you can create a new cached table
+          with just a subset of the data by using <codeph>CREATE TABLE ... 
CACHED IN '<varname>pool_name</varname>'
+          AS SELECT ... WHERE ...</codeph>. When you are finished with 
generating reports from this subset of data,
+          drop the table and both the data files and the data cached in RAM 
are automatically deleted.
+        </p>
+        </li>
+      </ul>
+
+      <p>
+        See <xref href="impala_create_table.xml#create_table"/> for the full 
syntax.
+      </p>
+
+      <p>
+        <b>Other memory considerations:</b>
+      </p>
+
+      <p>
+        Certain DDL operations, such as <codeph>ALTER TABLE ... SET 
LOCATION</codeph>, are blocked while the
+        underlying HDFS directories contain cached files. You must uncache the 
files first, before changing the
+        location, dropping the table, and so on.
+      </p>
+
+      <p>
+        When data is requested to be pinned in memory, that process happens in 
the background without blocking
+        access to the data while the caching is in progress. Loading the data 
from disk could take some time.
+        Impala reads each HDFS data block from memory if it has been pinned 
already, or from disk if it has not
+        been pinned yet. When files are added to a table or partition whose 
contents are cached, Impala
+        automatically detects those changes and performs a 
<codeph>REFRESH</codeph> automatically once the relevant
+        data is cached.
+      </p>
+
+      <p>
+        The amount of data that you can pin on each node through the HDFS 
caching mechanism is subject to a quota
+        that is enforced by the underlying HDFS service. Before requesting to 
pin an Impala table or partition in
+        memory, check that its size does not exceed this quota.
+      </p>
+
+      <note>
+        Because the HDFS cache consists of combined memory from all the 
DataNodes in the cluster, cached tables or
+        partitions can be bigger than the amount of HDFS cache memory on any 
single host.
+      </note>
     </conbody>
   </concept>
 
+  <concept id="hdfs_caching_etl">
+
+    <title>Loading and Removing Data with HDFS Caching Enabled</title>
+  <prolog>
+    <metadata>
+      <data name="Category" value="ETL"/>
+    </metadata>
+  </prolog>
+
+    <conbody>
+
+      <p>
+        When HDFS caching is enabled, extra processing happens in the 
background when you add or remove data
+        through statements such as <codeph>INSERT</codeph> and <codeph>DROP 
TABLE</codeph>.
+      </p>
+
+      <p>
+        <b>Inserting or loading data:</b>
+      </p>
+
+      <ul>
+        <li>
+          When Impala performs an <codeph><xref 
href="impala_insert.xml#insert">INSERT</xref></codeph> or
+          <codeph><xref href="impala_load_data.xml#load_data">LOAD 
DATA</xref></codeph> statement for a table or
+          partition that is cached, the new data files are automatically 
cached and Impala recognizes that fact
+          automatically.
+        </li>
+
+        <li>
+          If you perform an <codeph>INSERT</codeph> or <codeph>LOAD 
DATA</codeph> through Hive, as always, Impala
+          only recognizes the new data files after a <codeph>REFRESH 
<varname>table_name</varname></codeph>
+          statement in Impala.
+        </li>
+
+        <li>
+          If the cache pool is entirely full, or becomes full before all the 
requested data can be cached, the
+          Impala DDL statement returns an error. This is to avoid situations 
where only some of the requested data
+          could be cached.
+        </li>
+
+        <li>
+          When HDFS caching is enabled for a table or partition, new data 
files are cached automatically when they
+          are added to the appropriate directory in HDFS, without the need for 
a <codeph>REFRESH</codeph> statement
+          in Impala. Impala automatically performs a <codeph>REFRESH</codeph> 
once the new data is loaded into the
+          HDFS cache.
+        </li>
+      </ul>
+
+      <p>
+        <b>Dropping tables, partitions, or cache pools:</b>
+      </p>
+
+      <p>
+        The HDFS caching feature interacts with the Impala
+        <codeph><xref href="impala_drop_table.xml#drop_table">DROP 
TABLE</xref></codeph> and
+        <codeph><xref href="impala_alter_table.xml#alter_table">ALTER TABLE 
... DROP PARTITION</xref></codeph>
+        statements as follows:
+      </p>
+
+      <ul>
+        <li>
+          When you issue a <codeph>DROP TABLE</codeph> for a table that is 
entirely cached, or has some partitions
+          cached, the <codeph>DROP TABLE</codeph> succeeds and all the cache 
directives Impala submitted for that
+          table are removed from the HDFS cache system.
+        </li>
+
+        <li>
+          The same applies to <codeph>ALTER TABLE ... DROP PARTITION</codeph>. 
The operation succeeds and any cache
+          directives are removed.
+        </li>
+
+        <li>
+          As always, the underlying data files are removed if the dropped 
table is an internal table, or the
+          dropped partition is in its default location underneath an internal 
table. The data files are left alone
+          if the dropped table is an external table, or if the dropped 
partition is in a non-default location.
+        </li>
+
+        <li>
+          If you designated the data files as cached through the <cmdname>hdfs 
cacheadmin</cmdname> command, and
+          the data files are left behind as described in the previous item, 
the data files remain cached. Impala
+          only removes the cache directives submitted by Impala through the 
<codeph>CREATE TABLE</codeph> or
+          <codeph>ALTER TABLE</codeph> statements. It is OK to have multiple 
redundant cache directives pertaining
+          to the same files; the directives all have unique IDs and owners so 
that the system can tell them apart.
+        </li>
+
+        <li>
+          If you drop an HDFS cache pool through the <cmdname>hdfs 
cacheadmin</cmdname> command, all the Impala
+          data files are preserved, just no longer cached. After a subsequent 
<codeph>REFRESH</codeph>,
+          <codeph>SHOW TABLE STATS</codeph> reports 0 bytes cached for each 
associated Impala table or partition.
+        </li>
+      </ul>
+
+      <p>
+        <b>Relocating a table or partition:</b>
+      </p>
+
+      <p>
+        The HDFS caching feature interacts with the Impala
+        <codeph><xref href="impala_alter_table.xml#alter_table">ALTER TABLE 
... SET LOCATION</xref></codeph>
+        statement as follows:
+      </p>
+
+      <ul>
+        <li>
+          If you have designated a table or partition as cached through the 
<codeph>CREATE TABLE</codeph> or
+          <codeph>ALTER TABLE</codeph> statements, subsequent attempts to 
relocate the table or partition through
+          an <codeph>ALTER TABLE ... SET LOCATION</codeph> statement will 
fail. You must issue an <codeph>ALTER
+          TABLE ... SET UNCACHED</codeph> statement for the table or partition 
first. Otherwise, Impala would lose
+          track of some cached data files and have no way to uncache them 
later.
+        </li>
+      </ul>
+    </conbody>
+  </concept>
+
+  <concept id="hdfs_caching_admin">
+
+    <title>Administration for HDFS Caching with Impala</title>
+
+    <conbody>
+
+      <p>
+        Here are the guidelines and steps to check or change the status of 
HDFS caching for Impala data:
+      </p>
+
+      <p>
+        <b>hdfs cacheadmin command:</b>
+      </p>
+
+      <ul>
+        <li>
+          If you drop a cache pool with the <cmdname>hdfs cacheadmin</cmdname> 
command, Impala queries against the
+          associated data files will still work, by falling back to reading 
the files from disk. After performing a
+          <codeph>REFRESH</codeph> on the table, Impala reports the number of 
bytes cached as 0 for all associated
+          tables and partitions.
+        </li>
+
+        <li>
+          You might use <cmdname>hdfs cacheadmin</cmdname> to get a list of 
existing cache pools, or detailed
+          information about the pools, as follows:
+<codeblock scale="60">hdfs cacheadmin -listDirectives         # Basic info
+Found 122 entries
+  ID POOL       REPL EXPIRY  PATH
+ 123 testPool      1 never   /user/hive/warehouse/tpcds.store_sales
+ 124 testPool      1 never   
/user/hive/warehouse/tpcds.store_sales/ss_date=1998-01-15
+ 125 testPool      1 never   
/user/hive/warehouse/tpcds.store_sales/ss_date=1998-02-01
+...
+
+hdfs cacheadmin -listDirectives -stats  # More details
+Found 122 entries
+  ID POOL       REPL EXPIRY  PATH                                              
          BYTES_NEEDED  BYTES_CACHED  FILES_NEEDED  FILES_CACHED
+ 123 testPool      1 never   /user/hive/warehouse/tpcds.store_sales            
                     0             0             0             0
+ 124 testPool      1 never   
/user/hive/warehouse/tpcds.store_sales/ss_date=1998-01-15         143169        
143169             1             1
+ 125 testPool      1 never   
/user/hive/warehouse/tpcds.store_sales/ss_date=1998-02-01         112447        
112447             1             1
+...
+</codeblock>
+        </li>
+      </ul>
+
+      <p>
+        <b>Impala SHOW statement:</b>
+      </p>
+
+      <ul>
+        <li>
+          For each table or partition, the <codeph>SHOW TABLE STATS</codeph> 
or <codeph>SHOW PARTITIONS</codeph>
+          statement displays the number of bytes currently cached by the HDFS 
caching feature. If there are no
+          cache directives in place for that table or partition, the result 
set displays <codeph>NOT
+          CACHED</codeph>. A value of 0, or a smaller number than the overall 
size of the table or partition,
+          indicates that the cache request has been submitted but the data has 
not been entirely loaded into memory
+          yet. See <xref href="impala_show.xml#show"/> for details.
+        </li>
+      </ul>
+
+      <p>
+        <b>Cloudera Manager:</b>
+      </p>
+
+      <ul>
+        <li>
+          You can enable or disable HDFS caching through Cloudera Manager, 
using the configuration setting
+          <uicontrol>Maximum Memory Used for Caching</uicontrol> for the HDFS 
service. This control sets the HDFS
+          configuration parameter 
<codeph>dfs_datanode_max_locked_memory</codeph>, which specifies the upper limit
+          of HDFS cache size on each node.
+        </li>
+
+        <li>
+          All the other manipulation of the HDFS caching settings, such as 
what files are cached, is done through
+          the command line, either Impala DDL statements or the Linux 
<cmdname>hdfs cacheadmin</cmdname> command.
+        </li>
+      </ul>
+
+      <p>
+        <b>Impala memory limits:</b>
+      </p>
+
+      <p>
+        The Impala HDFS caching feature interacts with the Impala memory 
limits as follows:
+      </p>
+
+      <ul>
+        <li>
+          The maximum size of each HDFS cache pool is specified externally to 
Impala, through the <cmdname>hdfs
+          cacheadmin</cmdname> command.
+        </li>
+
+        <li>
+          All the memory used for HDFS caching is separate from the 
<cmdname>impalad</cmdname> daemon address space
+          and does not count towards the limits of the 
<codeph>--mem_limit</codeph> startup option,
+          <codeph>MEM_LIMIT</codeph> query option, or further limits imposed 
through YARN resource management or
+          the Linux <codeph>cgroups</codeph> mechanism.
+        </li>
+
+        <li>
+          Because accessing HDFS cached data avoids a memory-to-memory copy 
operation, queries involving cached
+          data require less memory on the Impala side than the equivalent 
queries on uncached data. In addition to
+          any performance benefits in a single-user environment, the reduced 
memory helps to improve scalability
+          under high-concurrency workloads.
+        </li>
+      </ul>
+    </conbody>
+  </concept>
+
+  <concept id="hdfs_caching_performance">
+
+    <title>Performance Considerations for HDFS Caching with Impala</title>
+
+    <conbody>
+
+      <p>
+        In Impala 1.4.0 and higher, Impala supports efficient reads from data 
that is pinned in memory through HDFS
+        caching. Impala takes advantage of the HDFS API and reads the data 
from memory rather than from disk
+        whether the data files are pinned using Impala DDL statements, or 
using the command-line mechanism where
+        you specify HDFS paths.
+      </p>
+
+      <p>
+        When you examine the output of the <cmdname>impala-shell</cmdname> 
<cmdname>SUMMARY</cmdname> command, or
+        look in the metrics report for the <cmdname>impalad</cmdname> daemon, 
you see how many bytes are read from
+        the HDFS cache. For example, this excerpt from a query profile 
illustrates that all the data read during a
+        particular phase of the query came from the HDFS cache, because the 
<codeph>BytesRead</codeph> and
+        <codeph>BytesReadDataNodeCache</codeph> values are identical.
+      </p>
+
+<codeblock>HDFS_SCAN_NODE (id=0):(Total: 11s114ms, non-child: 11s114ms, % 
non-child: 100.00%)
+        - AverageHdfsReadThreadConcurrency: 0.00
+        - AverageScannerThreadConcurrency: 32.75
+<b>        - BytesRead: 10.47 GB (11240756479)
+        - BytesReadDataNodeCache: 10.47 GB (11240756479)</b>
+        - BytesReadLocal: 10.47 GB (11240756479)
+        - BytesReadShortCircuit: 10.47 GB (11240756479)
+        - DecompressionTime: 27s572ms
+</codeblock>
+
+      <p>
+        For queries involving smaller amounts of data, or in single-user 
workloads, you might not notice a
+        significant difference in query response time with or without HDFS 
caching. Even with HDFS caching turned
+        off, the data for the query might still be in the Linux OS buffer 
cache. The benefits become clearer as
+        data volume increases, and especially as the system processes more 
concurrent queries. HDFS caching
+        improves the scalability of the overall system. That is, it prevents 
query performance from declining when
+        the workload outstrips the capacity of the Linux OS cache.
+      </p>
+
+      <p 
conref="../shared/impala_common.xml#common/hdfs_caching_encryption_caveat"/>
+
+      <p>
+        <b>SELECT considerations:</b>
+      </p>
+
+      <p>
+        The Impala HDFS caching feature interacts with the
+        <codeph><xref href="impala_select.xml#select">SELECT</xref></codeph> 
statement and query performance as
+        follows:
+      </p>
+
+      <ul>
+        <li>
+          Impala automatically reads from memory any data that has been 
designated as cached and actually loaded
+          into the HDFS cache. (It could take some time after the initial 
request to fully populate the cache for a
+          table with large size or many partitions.) The speedup comes from 
two aspects: reading from RAM instead
+          of disk, and accessing the data straight from the cache area instead 
of copying from one RAM area to
+          another. This second aspect yields further performance improvement 
over the standard OS caching
+          mechanism, which still results in memory-to-memory copying of cached 
data.
+        </li>
+
+        <li>
+          For small amounts of data, the query speedup might not be noticeable 
in terms of wall clock time. The
+          performance might be roughly the same with HDFS caching turned on or 
off, due to recently used data being
+          held in the Linux OS cache. The difference is more pronounced with:
+          <ul>
+            <li>
+              Data volumes (for all queries running concurrently) that exceed 
the size of the Linux OS cache.
+            </li>
+
+            <li>
+              A busy cluster running many concurrent queries, where the 
reduction in memory-to-memory copying and
+              overall memory usage during queries results in greater 
scalability and throughput.
+            </li>
+
+            <li>
+              Thus, to really exercise and benchmark this feature in a 
development environment, you might need to
+              simulate realistic workloads and concurrent queries that match 
your production environment.
+            </li>
+
+            <li>
+              One way to simulate a heavy workload on a lightly loaded system 
is to flush the OS buffer cache (on
+              each DataNode) between iterations of queries against the same 
tables or partitions:
+<codeblock>$ sync
+$ echo 1 &gt; /proc/sys/vm/drop_caches
+</codeblock>
+            </li>
+          </ul>
+        </li>
+
+        <li>
+          Impala queries take advantage of HDFS cached data regardless of 
whether the cache directive was issued by
+          Impala or externally through the <cmdname>hdfs cacheadmin</cmdname> 
command, for example for an external
+          table where the cached data files might be accessed by several 
different Hadoop components.
+        </li>
+
+        <li>
+          If your query returns a large result set, the time reported for the 
query could be dominated by the time
+          needed to print the results on the screen. To measure the time for 
the underlying query processing, query
+          the <codeph>COUNT()</codeph> of the big result set, which does all 
the same processing but only prints a
+          single line to the screen.
+        </li>
+      </ul>
+    </conbody>
+  </concept>
+</concept>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3c2c8f12/docs/topics/impala_perf_joins.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_perf_joins.xml 
b/docs/topics/impala_perf_joins.xml
index b7c88b2..4fcd896 100644
--- a/docs/topics/impala_perf_joins.xml
+++ b/docs/topics/impala_perf_joins.xml
@@ -4,7 +4,16 @@
 
   <title>Performance Considerations for Join Queries</title>
   <titlealts audience="PDF"><navtitle>Join Performance</navtitle></titlealts>
-  
+  <prolog>
+    <metadata>
+      <data name="Category" value="Impala"/>
+      <data name="Category" value="Performance"/>
+      <data name="Category" value="Querying"/>
+      <data name="Category" value="SQL"/>
+      <data name="Category" value="Developers"/>
+      <data name="Category" value="Data Analysts"/>
+    </metadata>
+  </prolog>
 
   <conbody>
 
@@ -16,7 +25,482 @@
       practical time and has to be cancelled.
     </p>
 
-    
+    <p rev="1.2.2">
+      The simplest technique for tuning an Impala join query is to collect 
statistics on each table involved in the
+      join using the <codeph><xref 
href="impala_compute_stats.xml#compute_stats">COMPUTE STATS</xref></codeph>
+      statement, and then let Impala automatically optimize the query based on 
the size of each table, number of
+      distinct values of each column, and so on. The <codeph>COMPUTE 
STATS</codeph> statement and the join
+      optimization are new features introduced in Impala 1.2.2. For accurate 
statistics about each table, issue the
+      <codeph>COMPUTE STATS</codeph> statement after loading the data into 
that table, and again if the amount of
+      data changes substantially due to an <codeph>INSERT</codeph>, 
<codeph>LOAD DATA</codeph>, adding a partition,
+      and so on.
+    </p>
+
+    <p>
+      If statistics are not available for all the tables in the join query, or 
if Impala chooses a join order that
+      is not the most efficient, you can override the automatic join order 
optimization by specifying the
+      <codeph>STRAIGHT_JOIN</codeph> keyword immediately after the 
<codeph>SELECT</codeph> keyword. In this case,
+      Impala uses the order the tables appear in the query to guide how the 
joins are processed.
+    </p>
+
+    <p>
+      When you use the <codeph>STRAIGHT_JOIN</codeph> technique, you must 
order the tables in the join query
+      manually instead of relying on the Impala optimizer. The optimizer uses 
sophisticated techniques to estimate
+      the size of the result set at each stage of the join. For manual 
ordering, use this heuristic approach to
+      start with, and then experiment to fine-tune the order:
+    </p>
+
+    <ul>
+      <li>
+        Specify the largest table first. This table is read from disk by each 
Impala node and so its size is not
+        significant in terms of memory usage during the query.
+      </li>
+
+      <li>
+        Next, specify the smallest table. The contents of the second, third, 
and so on tables are all transmitted
+        across the network. You want to minimize the size of the result set 
from each subsequent stage of the join
+        query. The most likely approach involves joining a small table first, 
so that the result set remains small
+        even as subsequent larger tables are processed.
+      </li>
+
+      <li>
+        Join the next smallest table, then the next smallest, and so on.
+      </li>
+
+      <li>
+        For example, if you had tables <codeph>BIG</codeph>, 
<codeph>MEDIUM</codeph>, <codeph>SMALL</codeph>, and
+        <codeph>TINY</codeph>, the logical join order to try would be 
<codeph>BIG</codeph>, <codeph>TINY</codeph>,
+        <codeph>SMALL</codeph>, <codeph>MEDIUM</codeph>.
+      </li>
+    </ul>
+
+    <p>
+      The terms <q>largest</q> and <q>smallest</q> refers to the size of the 
intermediate result set based on the
+      number of rows and columns from each table that are part of the result 
set. For example, if you join one
+      table <codeph>sales</codeph> with another table 
<codeph>customers</codeph>, a query might find results from
+      100 different customers who made a total of 5000 purchases. In that 
case, you would specify <codeph>SELECT
+      ... FROM sales JOIN customers ...</codeph>, putting 
<codeph>customers</codeph> on the right side because it
+      is smaller in the context of this query.
+    </p>
+
+    <p>
+      The Impala query planner chooses between different techniques for 
performing join queries, depending on the
+      absolute and relative sizes of the tables. <b>Broadcast joins</b> are 
the default, where the right-hand table
+      is considered to be smaller than the left-hand table, and its contents 
are sent to all the other nodes
+      involved in the query. The alternative technique is known as a 
<b>partitioned join</b> (not related to a
+      partitioned table), which is more suitable for large tables of roughly 
equal size. With this technique,
+      portions of each table are sent to appropriate other nodes where those 
subsets of rows can be processed in
+      parallel. The choice of broadcast or partitioned join also depends on 
statistics being available for all
+      tables in the join, gathered by the <codeph>COMPUTE STATS</codeph> 
statement.
+    </p>
+
+    <p>
+      To see which join strategy is used for a particular query, issue an 
<codeph>EXPLAIN</codeph> statement for
+      the query. If you find that a query uses a broadcast join when you know 
through benchmarking that a
+      partitioned join would be more efficient, or vice versa, add a hint to 
the query to specify the precise join
+      mechanism to use. See <xref href="impala_hints.xml#hints"/> for details.
+    </p>
+  </conbody>
+
+  <concept rev="1.2.2" id="joins_no_stats">
+
+    <title>How Joins Are Processed when Statistics Are Unavailable</title>
+  <prolog>
+    <metadata>
+      <data name="Category" value="Concepts"/>
+    </metadata>
+  </prolog>
+
+    <conbody>
+
+      <p>
+        If table or column statistics are not available for some tables in a 
join, Impala still reorders the tables
+        using the information that is available. Tables with statistics are 
placed on the left side of the join
+        order, in descending order of cost based on overall size and 
cardinality. Tables without statistics are
+        treated as zero-size, that is, they are always placed on the right 
side of the join order.
+      </p>
+    </conbody>
+  </concept>
+
+  <concept rev="1.2.2" id="straight_join">
+
+    <title>Overriding Join Reordering with STRAIGHT_JOIN</title>
+
+    <conbody>
+
+      <p>
+        If an Impala join query is inefficient because of outdated statistics 
or unexpected data distribution, you
+        can keep Impala from reordering the joined tables by using the 
<codeph>STRAIGHT_JOIN</codeph> keyword
+        immediately after the <codeph>SELECT</codeph> keyword. The 
<codeph>STRAIGHT_JOIN</codeph> keyword turns off
+        the reordering of join clauses that Impala does internally, and 
produces a plan that relies on the join
+        clauses being ordered optimally in the query text. In this case, 
rewrite the query so that the largest
+        table is on the left, followed by the next largest, and so on until 
the smallest table is on the right.
+      </p>
+
+      <p>
+        In this example, the subselect from the <codeph>BIG</codeph> table 
produces a very small result set, but
+        the table might still be treated as if it were the biggest and placed 
first in the join order. Using
+        <codeph>STRAIGHT_JOIN</codeph> for the last join clause prevents the 
final table from being reordered,
+        keeping it as the rightmost table in the join order.
+      </p>
+
+<codeblock>select straight_join x from medium join small join (select * from 
big where c1 &lt; 10) as big
+  where medium.id = small.id and small.id = big.id;</codeblock>
     </conbody>
   </concept>
 
+  <concept id="perf_joins_examples">
+
+    <title>Examples of Join Order Optimization</title>
+
+    <conbody>
+
+      <p>
+        Here are examples showing joins between tables with 1 billion, 200 
million, and 1 million rows. (In this
+        case, the tables are unpartitioned and using Parquet format.) The 
smaller tables contain subsets of data
+        from the largest one, for convenience of joining on the unique 
<codeph>ID</codeph> column. The smallest
+        table only contains a subset of columns from the others.
+      </p>
+
+      <p></p>
+
+<codeblock>[localhost:21000] &gt; create table big stored as parquet as select 
* from raw_data;
++----------------------------+
+| summary                    |
++----------------------------+
+| Inserted 1000000000 row(s) |
++----------------------------+
+Returned 1 row(s) in 671.56s
+[localhost:21000] &gt; desc big;
++-----------+---------+---------+
+| name      | type    | comment |
++-----------+---------+---------+
+| id        | int     |         |
+| val       | int     |         |
+| zfill     | string  |         |
+| name      | string  |         |
+| assertion | boolean |         |
++-----------+---------+---------+
+Returned 5 row(s) in 0.01s
+[localhost:21000] &gt; create table medium stored as parquet as select * from 
big limit 200 * floor(1e6);
++---------------------------+
+| summary                   |
++---------------------------+
+| Inserted 200000000 row(s) |
++---------------------------+
+Returned 1 row(s) in 138.31s
+[localhost:21000] &gt; create table small stored as parquet as select 
id,val,name from big where assertion = true limit 1 * floor(1e6);
++-------------------------+
+| summary                 |
++-------------------------+
+| Inserted 1000000 row(s) |
++-------------------------+
+Returned 1 row(s) in 6.32s</codeblock>
+
+      <p>
+        For any kind of performance experimentation, use the 
<codeph>EXPLAIN</codeph> statement to see how any
+        expensive query will be performed without actually running it, and 
enable verbose <codeph>EXPLAIN</codeph>
+        plans containing more performance-oriented detail: The most 
interesting plan lines are highlighted in bold,
+        showing that without statistics for the joined tables, Impala cannot 
make a good estimate of the number of
+        rows involved at each stage of processing, and is likely to stick with 
the <codeph>BROADCAST</codeph> join
+        mechanism that sends a complete copy of one of the tables to each node.
+      </p>
+
+<codeblock>[localhost:21000] &gt; set explain_level=verbose;
+EXPLAIN_LEVEL set to verbose
+[localhost:21000] &gt; explain select count(*) from big join medium where 
big.id = medium.id;
++----------------------------------------------------------+
+| Explain String                                           |
++----------------------------------------------------------+
+| Estimated Per-Host Requirements: Memory=2.10GB VCores=2  |
+|                                                          |
+| PLAN FRAGMENT 0                                          |
+|   PARTITION: UNPARTITIONED                               |
+|                                                          |
+|   6:AGGREGATE (merge finalize)                           |
+|   |  output: SUM(COUNT(*))                               |
+|   |  cardinality: 1                                      |
+|   |  per-host memory: unavailable                        |
+|   |  tuple ids: 2                                        |
+|   |                                                      |
+|   5:EXCHANGE                                             |
+|      cardinality: 1                                      |
+|      per-host memory: unavailable                        |
+|      tuple ids: 2                                        |
+|                                                          |
+| PLAN FRAGMENT 1                                          |
+|   PARTITION: RANDOM                                      |
+|                                                          |
+|   STREAM DATA SINK                                       |
+|     EXCHANGE ID: 5                                       |
+|     UNPARTITIONED                                        |
+|                                                          |
+|   3:AGGREGATE                                            |
+|   |  output: COUNT(*)                                    |
+|   |  cardinality: 1                                      |
+|   |  per-host memory: 10.00MB                            |
+|   |  tuple ids: 2                                        |
+|   |                                                      |
+|   2:HASH JOIN                                            |
+<b>|   |  join op: INNER JOIN (BROADCAST)                     |</b>
+|   |  hash predicates:                                    |
+|   |    big.id = medium.id                                |
+<b>|   |  cardinality: unavailable                            |</b>
+|   |  per-host memory: 2.00GB                             |
+|   |  tuple ids: 0 1                                      |
+|   |                                                      |
+|   |----4:EXCHANGE                                        |
+|   |       cardinality: unavailable                       |
+|   |       per-host memory: 0B                            |
+|   |       tuple ids: 1                                   |
+|   |                                                      |
+|   0:SCAN HDFS                                            |
+<b>|      table=join_order.big #partitions=1/1 size=23.12GB   |
+|      table stats: unavailable                            |
+|      column stats: unavailable                           |
+|      cardinality: unavailable                            |</b>
+|      per-host memory: 88.00MB                            |
+|      tuple ids: 0                                        |
+|                                                          |
+| PLAN FRAGMENT 2                                          |
+|   PARTITION: RANDOM                                      |
+|                                                          |
+|   STREAM DATA SINK                                       |
+|     EXCHANGE ID: 4                                       |
+|     UNPARTITIONED                                        |
+|                                                          |
+|   1:SCAN HDFS                                            |
+<b>|      table=join_order.medium #partitions=1/1 size=4.62GB |
+|      table stats: unavailable                            |
+|      column stats: unavailable                           |
+|      cardinality: unavailable                            |</b>
+|      per-host memory: 88.00MB                            |
+|      tuple ids: 1                                        |
++----------------------------------------------------------+
+Returned 64 row(s) in 0.04s</codeblock>
+
+      <p>
+        Gathering statistics for all the tables is straightforward, one 
<codeph>COMPUTE STATS</codeph> statement
+        per table:
+      </p>
+
+<codeblock>[localhost:21000] &gt; compute stats small;
++-----------------------------------------+
+| summary                                 |
++-----------------------------------------+
+| Updated 1 partition(s) and 3 column(s). |
++-----------------------------------------+
+Returned 1 row(s) in 4.26s
+[localhost:21000] &gt; compute stats medium;
++-----------------------------------------+
+| summary                                 |
++-----------------------------------------+
+| Updated 1 partition(s) and 5 column(s). |
++-----------------------------------------+
+Returned 1 row(s) in 42.11s
+[localhost:21000] &gt; compute stats big;
++-----------------------------------------+
+| summary                                 |
++-----------------------------------------+
+| Updated 1 partition(s) and 5 column(s). |
++-----------------------------------------+
+Returned 1 row(s) in 165.44s</codeblock>
+
+      <p>
+        With statistics in place, Impala can choose a more effective join 
order rather than following the
+        left-to-right sequence of tables in the query, and can choose 
<codeph>BROADCAST</codeph> or
+        <codeph>PARTITIONED</codeph> join strategies based on the overall 
sizes and number of rows in the table:
+      </p>
+
+<codeblock>[localhost:21000] &gt; explain select count(*) from medium join big 
where big.id = medium.id;
+Query: explain select count(*) from medium join big where big.id = medium.id
++-----------------------------------------------------------+
+| Explain String                                            |
++-----------------------------------------------------------+
+| Estimated Per-Host Requirements: Memory=937.23MB VCores=2 |
+|                                                           |
+| PLAN FRAGMENT 0                                           |
+|   PARTITION: UNPARTITIONED                                |
+|                                                           |
+|   6:AGGREGATE (merge finalize)                            |
+|   |  output: SUM(COUNT(*))                                |
+|   |  cardinality: 1                                       |
+|   |  per-host memory: unavailable                         |
+|   |  tuple ids: 2                                         |
+|   |                                                       |
+|   5:EXCHANGE                                              |
+|      cardinality: 1                                       |
+|      per-host memory: unavailable                         |
+|      tuple ids: 2                                         |
+|                                                           |
+| PLAN FRAGMENT 1                                           |
+|   PARTITION: RANDOM                                       |
+|                                                           |
+|   STREAM DATA SINK                                        |
+|     EXCHANGE ID: 5                                        |
+|     UNPARTITIONED                                         |
+|                                                           |
+|   3:AGGREGATE                                             |
+|   |  output: COUNT(*)                                     |
+|   |  cardinality: 1                                       |
+|   |  per-host memory: 10.00MB                             |
+|   |  tuple ids: 2                                         |
+|   |                                                       |
+|   2:HASH JOIN                                             |
+|   |  join op: INNER JOIN (BROADCAST)                      |
+|   |  hash predicates:                                     |
+|   |    big.id = medium.id                                 |
+|   |  cardinality: 1443004441                              |
+|   |  per-host memory: 839.23MB                            |
+|   |  tuple ids: 1 0                                       |
+|   |                                                       |
+|   |----4:EXCHANGE                                         |
+|   |       cardinality: 200000000                          |
+|   |       per-host memory: 0B                             |
+|   |       tuple ids: 0                                    |
+|   |                                                       |
+|   1:SCAN HDFS                                             |
+|      table=join_order.big #partitions=1/1 size=23.12GB    |
+|      table stats: 1000000000 rows total                   |
+|      column stats: all                                    |
+|      cardinality: 1000000000                              |
+|      per-host memory: 88.00MB                             |
+|      tuple ids: 1                                         |
+|                                                           |
+| PLAN FRAGMENT 2                                           |
+|   PARTITION: RANDOM                                       |
+|                                                           |
+|   STREAM DATA SINK                                        |
+|     EXCHANGE ID: 4                                        |
+|     UNPARTITIONED                                         |
+|                                                           |
+|   0:SCAN HDFS                                             |
+|      table=join_order.medium #partitions=1/1 size=4.62GB  |
+|      table stats: 200000000 rows total                    |
+|      column stats: all                                    |
+|      cardinality: 200000000                               |
+|      per-host memory: 88.00MB                             |
+|      tuple ids: 0                                         |
++-----------------------------------------------------------+
+Returned 64 row(s) in 0.04s
+
+[localhost:21000] &gt; explain select count(*) from small join big where 
big.id = small.id;
+Query: explain select count(*) from small join big where big.id = small.id
++-----------------------------------------------------------+
+| Explain String                                            |
++-----------------------------------------------------------+
+| Estimated Per-Host Requirements: Memory=101.15MB VCores=2 |
+|                                                           |
+| PLAN FRAGMENT 0                                           |
+|   PARTITION: UNPARTITIONED                                |
+|                                                           |
+|   6:AGGREGATE (merge finalize)                            |
+|   |  output: SUM(COUNT(*))                                |
+|   |  cardinality: 1                                       |
+|   |  per-host memory: unavailable                         |
+|   |  tuple ids: 2                                         |
+|   |                                                       |
+|   5:EXCHANGE                                              |
+|      cardinality: 1                                       |
+|      per-host memory: unavailable                         |
+|      tuple ids: 2                                         |
+|                                                           |
+| PLAN FRAGMENT 1                                           |
+|   PARTITION: RANDOM                                       |
+|                                                           |
+|   STREAM DATA SINK                                        |
+|     EXCHANGE ID: 5                                        |
+|     UNPARTITIONED                                         |
+|                                                           |
+|   3:AGGREGATE                                             |
+|   |  output: COUNT(*)                                     |
+|   |  cardinality: 1                                       |
+|   |  per-host memory: 10.00MB                             |
+|   |  tuple ids: 2                                         |
+|   |                                                       |
+|   2:HASH JOIN                                             |
+|   |  join op: INNER JOIN (BROADCAST)                      |
+|   |  hash predicates:                                     |
+|   |    big.id = small.id                                  |
+|   |  cardinality: 1000000000                              |
+|   |  per-host memory: 3.15MB                              |
+|   |  tuple ids: 1 0                                       |
+|   |                                                       |
+|   |----4:EXCHANGE                                         |
+|   |       cardinality: 1000000                            |
+|   |       per-host memory: 0B                             |
+|   |       tuple ids: 0                                    |
+|   |                                                       |
+|   1:SCAN HDFS                                             |
+|      table=join_order.big #partitions=1/1 size=23.12GB    |
+|      table stats: 1000000000 rows total                   |
+|      column stats: all                                    |
+|      cardinality: 1000000000                              |
+|      per-host memory: 88.00MB                             |
+|      tuple ids: 1                                         |
+|                                                           |
+| PLAN FRAGMENT 2                                           |
+|   PARTITION: RANDOM                                       |
+|                                                           |
+|   STREAM DATA SINK                                        |
+|     EXCHANGE ID: 4                                        |
+|     UNPARTITIONED                                         |
+|                                                           |
+|   0:SCAN HDFS                                             |
+|      table=join_order.small #partitions=1/1 size=17.93MB  |
+|      table stats: 1000000 rows total                      |
+|      column stats: all                                    |
+|      cardinality: 1000000                                 |
+|      per-host memory: 32.00MB                             |
+|      tuple ids: 0                                         |
++-----------------------------------------------------------+
+Returned 64 row(s) in 0.03s</codeblock>
+
+      <p>
+        When queries like these are actually run, the execution times are 
relatively consistent regardless of the
+        table order in the query text. Here are examples using both the unique 
<codeph>ID</codeph> column and the
+        <codeph>VAL</codeph> column containing duplicate values:
+      </p>
+
+<codeblock>[localhost:21000] &gt; select count(*) from big join small on 
(big.id = small.id);
+Query: select count(*) from big join small on (big.id = small.id)
++----------+
+| count(*) |
++----------+
+| 1000000  |
++----------+
+Returned 1 row(s) in 21.68s
+[localhost:21000] &gt; select count(*) from small join big on (big.id = 
small.id);
+Query: select count(*) from small join big on (big.id = small.id)
++----------+
+| count(*) |
++----------+
+| 1000000  |
++----------+
+Returned 1 row(s) in 20.45s
+
+[localhost:21000] &gt; select count(*) from big join small on (big.val = 
small.val);
++------------+
+| count(*)   |
++------------+
+| 2000948962 |
++------------+
+Returned 1 row(s) in 108.85s
+[localhost:21000] &gt; select count(*) from small join big on (big.val = 
small.val);
++------------+
+| count(*)   |
++------------+
+| 2000948962 |
++------------+
+Returned 1 row(s) in 100.76s</codeblock>
+
+      <note>
+        When examining the performance of join queries and the effectiveness 
of the join order optimization, make
+        sure the query involves enough data and cluster resources to see a 
difference depending on the query plan.
+        For example, a single data file of just a few megabytes will reside in 
a single HDFS block and be processed
+        on a single node. Likewise, if you use a single-node or two-node 
cluster, there might not be much
+        difference in efficiency for the broadcast or partitioned join 
strategies.
+      </note>
+    </conbody>
+  </concept>
+</concept>

Reply via email to