http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be0f122/docs/topics/impala_s3.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_s3.xml b/docs/topics/impala_s3.xml new file mode 100644 index 0000000..e61bb7c --- /dev/null +++ b/docs/topics/impala_s3.xml @@ -0,0 +1,796 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd"> +<concept id="s3" rev="2.2.0"> + + <title>Using Impala with the Amazon S3 Filesystem</title> + <titlealts audience="PDF"><navtitle>S3 Tables</navtitle></titlealts> + <prolog> + <metadata> + <data name="Category" value="Impala"/> + <data name="Category" value="Amazon"/> + <data name="Category" value="S3"/> + <data name="Category" value="Data Analysts"/> + <data name="Category" value="Developers"/> + <data name="Category" value="Querying"/> + <data name="Category" value="Preview Features"/> + </metadata> + </prolog> + + <conbody> + + <note conref="../shared/impala_common.xml#common/s3_production"/> + + <p rev="2.2.0"> + <indexterm audience="Cloudera">S3 with Impala</indexterm> + + <indexterm audience="Cloudera">Amazon S3 with Impala</indexterm> + You can use Impala to query data residing on the Amazon S3 filesystem. This capability allows convenient + access to a storage system that is remotely managed, accessible from anywhere, and integrated with various + cloud-based services. Impala can query files in any supported file format from S3. The S3 storage location + can be for an entire table, or individual partitions in a partitioned table. + </p> + + <p> + The default Impala tables use data files stored on HDFS, which are ideal for bulk loads and queries using + full-table scans. In contrast, queries against S3 data are less performant, making S3 suitable for holding + <q>cold</q> data that is only queried occasionally, while more frequently accessed <q>hot</q> data resides in + HDFS. In a partitioned table, you can set the <codeph>LOCATION</codeph> attribute for individual partitions + to put some partitions on HDFS and others on S3, typically depending on the age of the data. + </p> + + <p outputclass="toc inpage"/> + + </conbody> + + <concept id="s3_sql"> + <title>How Impala SQL Statements Work with S3</title> + <conbody> + <p> + Impala SQL statements work with data on S3 as follows: + </p> + <ul> + <li> + <p> + The <xref href="impala_create_table.xml#create_table"/> + or <xref href="impala_alter_table.xml#alter_table"/> statements + can specify that a table resides on the S3 filesystem by + encoding an <codeph>s3a://</codeph> prefix for the <codeph>LOCATION</codeph> + property. <codeph>ALTER TABLE</codeph> can also set the <codeph>LOCATION</codeph> + property for an individual partition, so that some data in a table resides on + S3 and other data in the same table resides on HDFS. + </p> + </li> + <li> + <p> + Once a table or partition is designated as residing on S3, the <xref href="impala_select.xml#select"/> + statement transparently accesses the data files from the appropriate storage layer. + </p> + </li> + <li> + <p> + If the S3 table is an internal table, the <xref href="impala_drop_table.xml#drop_table"/> statement + removes the corresponding data files from S3 when the table is dropped. + </p> + </li> + <li> + <p> + The <xref href="impala_truncate_table.xml#truncate_table"/> statement always removes the corresponding + data files from S3 when the table is truncated. + </p> + </li> + <li> + <p> + The <xref href="impala_load_data.xml#load_data"/> can move data files residing in HDFS into + an S3 table. + </p> + </li> + <li> + <p> + The <xref href="impala_insert.xml#insert"/> statement, or the <codeph>CREATE TABLE AS SELECT</codeph> + form of the <codeph>CREATE TABLE</codeph> statement, can copy data from an HDFS table or another S3 + table into an S3 table. The <xref href="impala_s3_skip_insert_staging.xml#s3_skip_insert_staging"/> + query option chooses whether or not to use a fast code path for these write operations to S3, + with the tradeoff of potential inconsistency in the case of a failure during the statement. + </p> + </li> + </ul> + <p> + For usage information about Impala SQL statements with S3 tables, see <xref href="impala_s3.xml#s3_ddl"/> + and <xref href="impala_s3.xml#s3_dml"/>. + </p> + </conbody> + </concept> + + <concept id="s3_creds"> + + <title>Specifying Impala Credentials to Access Data in S3</title> + + <conbody> + + <p> + <indexterm audience="Cloudera">fs.s3a.access.key configuration setting</indexterm> + <indexterm audience="Cloudera">fs.s3a.secret.key configuration setting</indexterm> + <indexterm audience="Cloudera">access.key configuration setting</indexterm> + <indexterm audience="Cloudera">secret.key configuration setting</indexterm> + To allow Impala to access data in S3, specify values for the following configuration settings in your + <filepath>core-site.xml</filepath> file: + </p> + +<!-- Normally I would turn this example into CDATA notation to avoid all the < and > entities. + However, then I couldn't use the <varname> tag inside the same example. --> +<codeblock> +<property> +<name>fs.s3a.access.key</name> +<value><varname>your_access_key</varname></value> +</property> +<property> +<name>fs.s3a.secret.key</name> +<value><varname>your_secret_key</varname></value> +</property> +</codeblock> + + <p> + As of CDH 5.8, these settings do not have corresponding controls in the Cloudera Manager user interface. + Specify them in the <uicontrol>HDFS Client Advanced Configuration Snippet (Safety Valve) for + core-site.xml</uicontrol> field. After specifying the credentials, restart both the Impala and Hive + services. (Restarting Hive is required because Impala queries, <codeph>CREATE TABLE</codeph> statements, + and so on go through the Hive metastore.) + </p> + +<!-- + <p rev="CDH-39914 IMPALA-3306"> + In <keyword keyref="impala26_full"/> and higher, you can specify the S3 access key and secret key through + configuration settings for the <cmdname>impalad</cmdname> daemon. + Rather than specifying the keys themselves on the command line or in startup scripts, + you specify the commands to retrieve the keys as <cmdname>impalad</cmdname> + startup options. For clusters not managed by Cloudera Manager, use the + <codeph>- + -s3a_access_key_cmd</codeph> and <codeph>- + -s3a_secret_key_cmd</codeph> + startup options for the <cmdname>impalad</cmdname> daemon. + For clusters managed by Cloudera Manager, set the + <codeph>s3a_access_key_cmd</codeph> and <codeph>s3a_secret_key_cmd</codeph> + configuration settings and restart the Impala and Hive services. + (Restarting Hive is required because Impala queries, <codeph>CREATE TABLE</codeph> statements, + and so on go through the Hive metastore.) + </p> +--> + + <note type="important"> +<!-- + <ul> + <li> + <p rev="CDH-39914 IMPALA-3306"> + The <codeph>s3a_access_key_cmd</codeph> and <codeph>s3a_secret_key_cmd</codeph> settings + for <cmdname>impalad</cmdname> only allow Impala to access S3. You must still include the credentials in the + client <filepath>hdfs-site.xml</filepath> configuration file to allow S3 access for the Hive metastore, + <codeph>hadoop fs</codeph> command, and so on. + </p> + </li> + <li> +--> + <p> + Although you can specify the access key ID and secret key as part of the <codeph>s3a://</codeph> URL in the + <codeph>LOCATION</codeph> attribute, doing so makes this sensitive information visible in many places, such + as <codeph>DESCRIBE FORMATTED</codeph> output and Impala log files. Therefore, specify this information + centrally in the <filepath>core-site.xml</filepath> file, and restrict read access to that file to only + trusted users. + </p> +<!-- + </li> +--> + <!-- Overriding with a new first list bullet following clarification by Sailesh. + <li> + <p rev="CDH-39914 IMPALA-3306"> + Prior to <keyword keyref="impala26_full"/> an alternative way to specify the keys was by + including the fields <codeph>fs.s3a.access.key</codeph> and <codeph>fs.s3a.secret.key</codeph> + in a configuration file such as <filepath>core-site.xml</filepath> or <filepath>hdfs-site.xml</filepath>. + With the enhanced S3 key management in <keyword keyref="impala26_full"/> and higher, if you are upgrading from + an earlier release where you used Impala with S3, remove the S3 keys from any copies of those files. + </p> + </li> + --> +<!-- + </ul> +--> + </note> + + </conbody> + + </concept> + + <concept id="s3_etl"> + + <title>Loading Data into S3 for Impala Queries</title> + <prolog> + <metadata> + <data name="Category" value="ETL"/> + <data name="Category" value="Ingest"/> + </metadata> + </prolog> + + <conbody> + + <p> + If your ETL pipeline involves moving data into S3 and then querying through Impala, + you can either use Impala DML statements to create, move, or copy the data, or + use the same data loading techniques as you would for non-Impala data. + </p> + + </conbody> + + <concept id="s3_dml" rev="2.6.0 CDH-39913 IMPALA-1878"> + <title>Using Impala DML Statements for S3 Data</title> + <conbody> + <p conref="../shared/impala_common.xml#common/s3_dml"/> + <p conref="../shared/impala_common.xml#common/s3_dml_performance"/> + </conbody> + </concept> + + <concept id="s3_manual_etl"> + <title>Manually Loading Data into Impala Tables on S3</title> + <conbody> + <p> + As an alternative, or on earlier Impala releases without DML support for S3, + you can use the Amazon-provided methods to bring data files into S3 for querying through Impala. See + <xref href="http://aws.amazon.com/s3/" scope="external" format="html">the Amazon S3 web site</xref> for + details. + </p> + + <note type="important"> + <p conref="../shared/impala_common.xml#common/s3_drop_table_purge"/> + </note> + + <p> + Alternative file creation techniques (less compatible with the <codeph>PURGE</codeph> clause) include: + </p> + + <ul> + <li> + The <xref href="https://console.aws.amazon.com/s3/home" scope="external" format="html">Amazon AWS / S3 + web interface</xref> to upload from a web browser. + </li> + + <li> + The <xref href="http://aws.amazon.com/cli/" scope="external" format="html">Amazon AWS CLI</xref> to + manipulate files from the command line. + </li> + + <li> + Other S3-enabled software, such as + <xref href="http://s3tools.org/s3cmd" scope="external" format="html">the S3Tools client software</xref>. + </li> + </ul> + + <p> + After you upload data files to a location already mapped to an Impala table or partition, or if you delete + files in S3 from such a location, issue the <codeph>REFRESH <varname>table_name</varname></codeph> + statement to make Impala aware of the new set of data files. + </p> + + </conbody> + </concept> + + </concept> + + <concept id="s3_ddl"> + + <title>Creating Impala Databases, Tables, and Partitions for Data Stored on S3</title> + <prolog> + <metadata> + <data name="Category" value="Databases"/> + </metadata> + </prolog> + + <conbody> + + <p> + Impala reads data for a table or partition from S3 based on the <codeph>LOCATION</codeph> attribute for the + table or partition. Specify the S3 details in the <codeph>LOCATION</codeph> clause of a <codeph>CREATE + TABLE</codeph> or <codeph>ALTER TABLE</codeph> statement. The notation for the <codeph>LOCATION</codeph> + clause is <codeph>s3a://<varname>bucket_name</varname>/<varname>path/to/file</varname></codeph>. The + filesystem prefix is always <codeph>s3a://</codeph> because Impala does not support the <codeph>s3://</codeph> or + <codeph>s3n://</codeph> prefixes. + </p> + + <p> + For a partitioned table, either specify a separate <codeph>LOCATION</codeph> clause for each new partition, + or specify a base <codeph>LOCATION</codeph> for the table and set up a directory structure in S3 to mirror + the way Impala partitioned tables are structured in HDFS. Although, strictly speaking, S3 filenames do not + have directory paths, Impala treats S3 filenames with <codeph>/</codeph> characters the same as HDFS + pathnames that include directories. + </p> + + <p> + You point a nonpartitioned table or an individual partition at S3 by specifying a single directory + path in S3, which could be any arbitrary directory. To replicate the structure of an entire Impala + partitioned table or database in S3 requires more care, with directories and subdirectories nested and + named to match the equivalent directory tree in HDFS. Consider setting up an empty staging area if + necessary in HDFS, and recording the complete directory structure so that you can replicate it in S3. + <!-- + Or, specify an S3 location for an entire database, after which all tables and partitions created inside that + database automatically inherit the database <codeph>LOCATION</codeph> and create new S3 directories + underneath the database directory. + --> + </p> + + <p> + For convenience when working with multiple tables with data files stored in S3, you can create a database + with a <codeph>LOCATION</codeph> attribute pointing to an S3 path. + Specify a URL of the form <codeph>s3a://<varname>bucket</varname>/<varname>root/path/for/database</varname></codeph> + for the <codeph>LOCATION</codeph> attribute of the database. + Any tables created inside that database + automatically create directories underneath the one specified by the database + <codeph>LOCATION</codeph> attribute. + </p> + + <p> + For example, the following session creates a partitioned table where only a single partition resides on S3. + The partitions for years 2013 and 2014 are located on HDFS. The partition for year 2015 includes a + <codeph>LOCATION</codeph> attribute with an <codeph>s3a://</codeph> URL, and so refers to data residing on + S3, under a specific path underneath the bucket <codeph>impala-demo</codeph>. + </p> + +<codeblock>[localhost:21000] > create database db_on_hdfs; +[localhost:21000] > use db_on_hdfs; +[localhost:21000] > create table mostly_on_hdfs (x int) partitioned by (year int); +[localhost:21000] > alter table mostly_on_hdfs add partition (year=2013); +[localhost:21000] > alter table mostly_on_hdfs add partition (year=2014); +[localhost:21000] > alter table mostly_on_hdfs add partition (year=2015) + > location 's3a://impala-demo/dir1/dir2/dir3/t1'; +</codeblock> + + <p> + The following session creates a database and two partitioned tables residing entirely on S3, one + partitioned by a single column and the other partitioned by multiple columns. Because a + <codeph>LOCATION</codeph> attribute with an <codeph>s3a://</codeph> URL is specified for the database, the + tables inside that database are automatically created on S3 underneath the database directory. To see the + names of the associated subdirectories, including the partition key values, we use an S3 client tool to + examine how the directory structure is organized on S3. For example, Impala partition directories such as + <codeph>month=1</codeph> do not include leading zeroes, which sometimes appear in partition directories created + through Hive. + </p> + +<codeblock>[localhost:21000] > create database db_on_s3 location 's3a://impala-demo/dir1/dir2/dir3'; +[localhost:21000] > use db_on_s3; + +[localhost:21000] > create table partitioned_on_s3 (x int) partitioned by (year int); +[localhost:21000] > alter table partitioned_on_s3 add partition (year=2013); +[localhost:21000] > alter table partitioned_on_s3 add partition (year=2014); +[localhost:21000] > alter table partitioned_on_s3 add partition (year=2015); + +[localhost:21000] > !aws s3 ls s3://impala-demo/dir1/dir2/dir3 --recursive; +2015-03-17 13:56:34 0 dir1/dir2/dir3/ +2015-03-17 16:43:28 0 dir1/dir2/dir3/partitioned_on_s3/ +2015-03-17 16:43:49 0 dir1/dir2/dir3/partitioned_on_s3/year=2013/ +2015-03-17 16:43:53 0 dir1/dir2/dir3/partitioned_on_s3/year=2014/ +2015-03-17 16:43:58 0 dir1/dir2/dir3/partitioned_on_s3/year=2015/ + +[localhost:21000] > create table partitioned_multiple_keys (x int) + > partitioned by (year smallint, month tinyint, day tinyint); +[localhost:21000] > alter table partitioned_multiple_keys + > add partition (year=2015,month=1,day=1); +[localhost:21000] > alter table partitioned_multiple_keys + > add partition (year=2015,month=1,day=31); +[localhost:21000] > alter table partitioned_multiple_keys + > add partition (year=2015,month=2,day=28); + +[localhost:21000] > !aws s3 ls s3://impala-demo/dir1/dir2/dir3 --recursive; +2015-03-17 13:56:34 0 dir1/dir2/dir3/ +2015-03-17 16:47:13 0 dir1/dir2/dir3/partitioned_multiple_keys/ +2015-03-17 16:47:44 0 dir1/dir2/dir3/partitioned_multiple_keys/year=2015/month=1/day=1/ +2015-03-17 16:47:50 0 dir1/dir2/dir3/partitioned_multiple_keys/year=2015/month=1/day=31/ +2015-03-17 16:47:57 0 dir1/dir2/dir3/partitioned_multiple_keys/year=2015/month=2/day=28/ +2015-03-17 16:43:28 0 dir1/dir2/dir3/partitioned_on_s3/ +2015-03-17 16:43:49 0 dir1/dir2/dir3/partitioned_on_s3/year=2013/ +2015-03-17 16:43:53 0 dir1/dir2/dir3/partitioned_on_s3/year=2014/ +2015-03-17 16:43:58 0 dir1/dir2/dir3/partitioned_on_s3/year=2015/ +</codeblock> + + <p> + The <codeph>CREATE DATABASE</codeph> and <codeph>CREATE TABLE</codeph> statements create the associated + directory paths if they do not already exist. You can specify multiple levels of directories, and the + <codeph>CREATE</codeph> statement creates all appropriate levels, similar to using <codeph>mkdir + -p</codeph>. + </p> + + <p> + Use the standard S3 file upload methods to actually put the data files into the right locations. You can + also put the directory paths and data files in place before creating the associated Impala databases or + tables, and Impala automatically uses the data from the appropriate location after the associated databases + and tables are created. + </p> + + <p> + You can switch whether an existing table or partition points to data in HDFS or S3. For example, if you + have an Impala table or partition pointing to data files in HDFS or S3, and you later transfer those data + files to the other filesystem, use an <codeph>ALTER TABLE</codeph> statement to adjust the + <codeph>LOCATION</codeph> attribute of the corresponding table or partition to reflect that change. Because + Impala does not have an <codeph>ALTER DATABASE</codeph> statement, this location-switching technique is not + practical for entire databases that have a custom <codeph>LOCATION</codeph> attribute. + </p> + + </conbody> + + </concept> + + <concept id="s3_internal_external"> + + <title>Internal and External Tables Located on S3</title> + + <conbody> + + <p> + Just as with tables located on HDFS storage, you can designate S3-based tables as either internal (managed + by Impala) or external, by using the syntax <codeph>CREATE TABLE</codeph> or <codeph>CREATE EXTERNAL + TABLE</codeph> respectively. When you drop an internal table, the files associated with the table are + removed, even if they are on S3 storage. When you drop an external table, the files associated with the + table are left alone, and are still available for access by other tools or components. See + <xref href="impala_tables.xml#tables"/> for details. + </p> + + <p> + If the data on S3 is intended to be long-lived and accessed by other tools in addition to Impala, create + any associated S3 tables with the <codeph>CREATE EXTERNAL TABLE</codeph> syntax, so that the files are not + deleted from S3 when the table is dropped. + </p> + + <p> + If the data on S3 is only needed for querying by Impala and can be safely discarded once the Impala + workflow is complete, create the associated S3 tables using the <codeph>CREATE TABLE</codeph> syntax, so + that dropping the table also deletes the corresponding data files on S3. + </p> + + <p> + For example, this session creates a table in S3 with the same column layout as a table in HDFS, then + examines the S3 table and queries some data from it. The table in S3 works the same as a table in HDFS as + far as the expected file format of the data, table and column statistics, and other table properties. The + only indication that it is not an HDFS table is the <codeph>s3a://</codeph> URL in the + <codeph>LOCATION</codeph> property. Many data files can reside in the S3 directory, and their combined + contents form the table data. Because the data in this example is uploaded after the table is created, a + <codeph>REFRESH</codeph> statement prompts Impala to update its cached information about the data files. + </p> + +<codeblock>[localhost:21000] > create table usa_cities_s3 like usa_cities location 's3a://impala-demo/usa_cities'; +[localhost:21000] > desc usa_cities_s3; ++-------+----------+---------+ +| name | type | comment | ++-------+----------+---------+ +| id | smallint | | +| city | string | | +| state | string | | ++-------+----------+---------+ + +-- Now from a web browser, upload the same data file(s) to S3 as in the HDFS table, +-- under the relevant bucket and path. If you already have the data in S3, you would +-- point the table LOCATION at an existing path. + +[localhost:21000] > refresh usa_cities_s3; +[localhost:21000] > select count(*) from usa_cities_s3; ++----------+ +| count(*) | ++----------+ +| 289 | ++----------+ +[localhost:21000] > select distinct state from sample_data_s3 limit 5; ++----------------------+ +| state | ++----------------------+ +| Louisiana | +| Minnesota | +| Georgia | +| Alaska | +| Ohio | ++----------------------+ +[localhost:21000] > desc formatted usa_cities_s3; ++------------------------------+------------------------------+---------+ +| name | type | comment | ++------------------------------+------------------------------+---------+ +| # col_name | data_type | comment | +| | NULL | NULL | +| id | smallint | NULL | +| city | string | NULL | +| state | string | NULL | +| | NULL | NULL | +| # Detailed Table Information | NULL | NULL | +| Database: | s3_testing | NULL | +| Owner: | jrussell | NULL | +| CreateTime: | Mon Mar 16 11:36:25 PDT 2015 | NULL | +| LastAccessTime: | UNKNOWN | NULL | +| Protect Mode: | None | NULL | +| Retention: | 0 | NULL | +| Location: | s3a://impala-demo/usa_cities | NULL | +| Table Type: | MANAGED_TABLE | NULL | +... ++------------------------------+------------------------------+---------+ +</codeblock> + +<!-- Cut out unnecessary output, makes the example too wide. +| Table Parameters: | NULL | NULL | +| | COLUMN_STATS_ACCURATE | false | +| | numFiles | 0 | +| | numRows | -1 | +| | rawDataSize | -1 | +| | totalSize | 0 | +| | transient_lastDdlTime | 1426528176 | +| | NULL | NULL | +| # Storage Information | NULL | NULL | +| SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL | +| InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL | +| OutputFormat: | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL | +| Compressed: | No | NULL | +| Num Buckets: | 0 | NULL | +| Bucket Columns: | [] | NULL | +| Sort Columns: | [] | NULL | +--> + + <p> + In this case, we have already uploaded a Parquet file with a million rows of data to the + <codeph>sample_data</codeph> directory underneath the <codeph>impala-demo</codeph> bucket on S3. This + session creates a table with matching column settings pointing to the corresponding location in S3, then + queries the table. Because the data is already in place on S3 when the table is created, no + <codeph>REFRESH</codeph> statement is required. + </p> + +<codeblock>[localhost:21000] > create table sample_data_s3 + > (id int, id bigint, val int, zerofill string, + > name string, assertion boolean, city string, state string) + > stored as parquet location 's3a://impala-demo/sample_data'; +[localhost:21000] > select count(*) from sample_data_s3;; ++----------+ +| count(*) | ++----------+ +| 1000000 | ++----------+ +[localhost:21000] > select count(*) howmany, assertion from sample_data_s3 group by assertion; ++---------+-----------+ +| howmany | assertion | ++---------+-----------+ +| 667149 | true | +| 332851 | false | ++---------+-----------+ +</codeblock> + + </conbody> + + </concept> + + <concept id="s3_queries"> + + <title>Running and Tuning Impala Queries for Data Stored on S3</title> + + <conbody> + + <p> + Once the appropriate <codeph>LOCATION</codeph> attributes are set up at the table or partition level, you + query data stored in S3 exactly the same as data stored on HDFS or in HBase: + </p> + + <ul> + <li> + Queries against S3 data support all the same file formats as for HDFS data. + </li> + + <li> + Tables can be unpartitioned or partitioned. For partitioned tables, either manually construct paths in S3 + corresponding to the HDFS directories representing partition key values, or use <codeph>ALTER TABLE ... + ADD PARTITION</codeph> to set up the appropriate paths in S3. + </li> + + <li> + HDFS and HBase tables can be joined to S3 tables, or S3 tables can be joined with each other. + </li> + + <li> + Authorization using the Sentry framework to control access to databases, tables, or columns works the + same whether the data is in HDFS or in S3. + </li> + + <li> + The <cmdname>catalogd</cmdname> daemon caches metadata for both HDFS and S3 tables. Use + <codeph>REFRESH</codeph> and <codeph>INVALIDATE METADATA</codeph> for S3 tables in the same situations + where you would issue those statements for HDFS tables. + </li> + + <li> + Queries against S3 tables are subject to the same kinds of admission control and resource management as + HDFS tables. + </li> + + <li> + Metadata about S3 tables is stored in the same metastore database as for HDFS tables. + </li> + + <li> + You can set up views referring to S3 tables, the same as for HDFS tables. + </li> + + <li> + The <codeph>COMPUTE STATS</codeph>, <codeph>SHOW TABLE STATS</codeph>, and <codeph>SHOW COLUMN + STATS</codeph> statements work for S3 tables also. + </li> + </ul> + + </conbody> + + <concept id="s3_performance"> + + <title>Understanding and Tuning Impala Query Performance for S3 Data</title> + <prolog> + <metadata> + <data name="Category" value="Performance"/> + </metadata> + </prolog> + + <conbody> + + <p> + Although Impala queries for data stored in S3 might be less performant than queries against the + equivalent data stored in HDFS, you can still do some tuning. Here are techniques you can use to + interpret explain plans and profiles for queries against S3 data, and tips to achieve the best + performance possible for such queries. + </p> + + <p> + All else being equal, performance is expected to be lower for queries running against data on S3 rather + than HDFS. The actual mechanics of the <codeph>SELECT</codeph> statement are somewhat different when the + data is in S3. Although the work is still distributed across the datanodes of the cluster, Impala might + parallelize the work for a distributed query differently for data on HDFS and S3. S3 does not have the + same block notion as HDFS, so Impala uses heuristics to determine how to split up large S3 files for + processing in parallel. Because all hosts can access any S3 data file with equal efficiency, the + distribution of work might be different than for HDFS data, where the data blocks are physically read + using short-circuit local reads by hosts that contain the appropriate block replicas. Although the I/O to + read the S3 data might be spread evenly across the hosts of the cluster, the fact that all data is + initially retrieved across the network means that the overall query performance is likely to be lower for + S3 data than for HDFS data. + </p> + + <p conref="../shared/impala_common.xml#common/s3_block_splitting"/> + + <p conref="../shared/impala_common.xml#common/s3_dml_performance"/> + + <p> + When optimizing aspects of for complex queries such as the join order, Impala treats tables on HDFS and + S3 the same way. Therefore, follow all the same tuning recommendations for S3 tables as for HDFS ones, + such as using the <codeph>COMPUTE STATS</codeph> statement to help Impala construct accurate estimates of + row counts and cardinality. See <xref href="impala_performance.xml#performance"/> for details. + </p> + + <p> + In query profile reports, the numbers for <codeph>BytesReadLocal</codeph>, + <codeph>BytesReadShortCircuit</codeph>, <codeph>BytesReadDataNodeCached</codeph>, and + <codeph>BytesReadRemoteUnexpected</codeph> are blank because those metrics come from HDFS. + If you do see any indications that a query against an S3 table performed <q>remote read</q> + operations, do not be alarmed. That is expected because, by definition, all the I/O for S3 tables involves + remote reads. + </p> + + </conbody> + + </concept> + + </concept> + + <concept id="s3_restrictions"> + + <title>Restrictions on Impala Support for S3</title> + + <conbody> + + <p> + Impala requires that the default filesystem for the cluster be HDFS. You cannot use S3 as the only + filesystem in the cluster. + </p> + + <p rev="2.6.0 CDH-39913 IMPALA-1878"> + Prior to <keyword keyref="impala26_full"/> Impala could not perform DML operations (<codeph>INSERT</codeph>, + <codeph>LOAD DATA</codeph>, or <codeph>CREATE TABLE AS SELECT</codeph>) where the destination is a table + or partition located on an S3 filesystem. This restriction is lifted in <keyword keyref="impala26_full"/> and higher. + </p> + + <p> + Impala does not support the old <codeph>s3://</codeph> block-based and <codeph>s3n://</codeph> filesystem + schemes, only <codeph>s3a://</codeph>. + </p> + + <p> + Although S3 is often used to store JSON-formatted data, the current Impala support for S3 does not include + directly querying JSON data. For Impala queries, use data files in one of the file formats listed in + <xref href="impala_file_formats.xml#file_formats"/>. If you have data in JSON format, you can prepare a + flattened version of that data for querying by Impala as part of your ETL cycle. + </p> + + <p> + You cannot use the <codeph>ALTER TABLE ... SET CACHED</codeph> statement for tables or partitions that are + located in S3. + </p> + + </conbody> + + </concept> + + <concept id="s3_best_practices" rev="2.6.0 CDH-33310 CDH-39913 IMPALA-1878"> + <title>Best Practices for Using Impala with S3</title> + <prolog> + <metadata> + <data name="Category" value="Guidelines"/> + <data name="Category" value="Best Practices"/> + </metadata> + </prolog> + <conbody> + <p> + The following guidelines represent best practices derived from testing and field experience with Impala on S3: + </p> + <ul> + <li> + <p> + Any reference to an S3 location must be fully qualified. (This rule applies when + S3 is not designated as the default filesystem.) + </p> + </li> + <li> + <p> + Set the safety valve <codeph>fs.s3a.connection.maximum</codeph> to 1500 for <cmdname>impalad</cmdname>. + </p> + </li> + <li> + <p> + Set safety valve <codeph>fs.s3a.block.size</codeph> to 134217728 + (128 MB in bytes) if most Parquet files queried by Impala were written by Hive + or ParquetMR jobs. Set the block size to 268435456 (256 MB in bytes) if most Parquet + files queried by Impala were written by Impala. + </p> + </li> + <li> + <p> + <codeph>DROP TABLE .. PURGE</codeph> is much faster than the default <codeph>DROP TABLE</codeph>. + The same applies to <codeph>ALTER TABLE ... DROP PARTITION PURGE</codeph> + versus the default <codeph>DROP PARTITION</codeph> operation. + However, due to the eventually consistent nature of S3, the files for that + table or partition could remain for some unbounded time when using <codeph>PURGE</codeph>. + The default <codeph>DROP TABLE/PARTITION</codeph> is slow because Impala copies the files to the HDFS trash folder, + and Impala waits until all the data is moved. <codeph>DROP TABLE/PARTITION .. PURGE</codeph> is a + fast delete operation, and the Impala statement finishes quickly even though the change might not + have propagated fully throughout S3. + </p> + </li> + <li> + <p> + <codeph>INSERT</codeph> statements are faster than <codeph>INSERT OVERWRITE</codeph> for S3. + The query option <codeph>S3_SKIP_INSERT_STAGING</codeph>, which is set to <codeph>true</codeph> by default, + skips the staging step for regular <codeph>INSERT</codeph> (but not <codeph>INSERT OVERWRITE</codeph>). + This makes the operation much faster, but consistency is not guaranteed: if a node fails during execution, the + table could end up with inconsistent data. Set this option to <codeph>false</codeph> if stronger + consistency is required, however this setting will make the <codeph>INSERT</codeph> operations slower. + </p> + </li> + <li> + <p> + Too many files in a table can make metadata loading and updating slow on S3. + If too many requests are made to S3, S3 has a back-off mechanism and + responds slower than usual. You might have many small files because of: + </p> + <ul> + <li> + <p> + Too many partitions due to over-granular partitioning. Prefer partitions with + many megabytes of data, so that even a query against a single partition can + be parallelized effectively. + </p> + </li> + <li> + <p> + Many small <codeph>INSERT</codeph> queries. Prefer bulk + <codeph>INSERT</codeph>s so that more data is written to fewer + files. + </p> + </li> + </ul> + </li> + </ul> + + </conbody> + </concept> + + +</concept>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be0f122/docs/topics/impala_s3_skip_insert_staging.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_s3_skip_insert_staging.xml b/docs/topics/impala_s3_skip_insert_staging.xml new file mode 100644 index 0000000..f9035a2 --- /dev/null +++ b/docs/topics/impala_s3_skip_insert_staging.xml @@ -0,0 +1,77 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd"> +<concept id="s3_skip_insert_staging" rev="2.6.0 IMPALA-3452 CDH-39913"> + + <title>S3_SKIP_INSERT_STAGING Query Option (<keyword keyref="impala26"/> or higher only)</title> + <titlealts audience="PDF"><navtitle>S3_SKIP_INSERT_STAGING</navtitle></titlealts> + <prolog> + <metadata> + <data name="Category" value="Impala"/> + <data name="Category" value="Impala Query Options"/> + <data name="Category" value="Amazon"/> + <data name="Category" value="S3"/> + <data name="Category" value="Performance"/> + <data name="Category" value="Developers"/> + <data name="Category" value="Data Analysts"/> + </metadata> + </prolog> + + <conbody> + + <p rev="2.6.0 IMPALA-3452 CDH-39913"> + <indexterm audience="Cloudera">IMPALA_S3_SKIP_INSERT_STAGING query option</indexterm> + </p> + + <p> + Speeds up <codeph>INSERT</codeph> operations on tables or partitions residing on the + Amazon S3 filesystem. The tradeoff is the possibility of inconsistent data left behind + if an error occurs partway through the operation. + </p> + + <p> + By default, Impala write operations to S3 tables and partitions involve a two-stage process. + Impala writes intermediate files to S3, then (because S3 does not provide a <q>rename</q> + operation) those intermediate files are copied to their final location, making the process + more expensive as on a filesystem that supports renaming or moving files. + This query option makes Impala skip the intermediate files, and instead write the + new data directly to the final destination. + </p> + + <p conref="../shared/impala_common.xml#common/usage_notes_blurb"/> + + <note type="important"> + <p> + If a host that is participating in the <codeph>INSERT</codeph> operation fails partway through + the query, you might be left with a table or partition that contains some but not all of the + expected data files. Therefore, this option is most appropriate for a development or test + environment where you have the ability to reconstruct the table if a problem during + <codeph>INSERT</codeph> leaves the data in an inconsistent state. + </p> + </note> + + <p> + The timing of file deletion during an <codeph>INSERT OVERWRITE</codeph> operation + makes it impractical to write new files to S3 and delete the old files in a single operation. + Therefore, this query option only affects regular <codeph>INSERT</codeph> statements that add + to the existing data in a table, not <codeph>INSERT OVERWRITE</codeph> statements. + Use <codeph>TRUNCATE TABLE</codeph> if you need to remove all contents from an S3 table + before performing a fast <codeph>INSERT</codeph> with this option enabled. + </p> + + <p> + Performance improvements with this option enabled can be substantial. The speed increase + might be more noticeable for non-partitioned tables than for partitioned tables. + </p> + + <p conref="../shared/impala_common.xml#common/type_boolean"/> + <p conref="../shared/impala_common.xml#common/default_true_1"/> + + <p conref="../shared/impala_common.xml#common/added_in_260"/> + + <p conref="../shared/impala_common.xml#common/related_info"/> + <p> + <xref href="impala_s3.xml#s3"/> + </p> + + </conbody> +</concept> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be0f122/docs/topics/impala_scalability.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_scalability.xml b/docs/topics/impala_scalability.xml new file mode 100644 index 0000000..ea0d0a4 --- /dev/null +++ b/docs/topics/impala_scalability.xml @@ -0,0 +1,833 @@ +<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd"> +<concept id="scalability"> + + <title>Scalability Considerations for Impala</title> + <titlealts audience="PDF"><navtitle>Scalability Considerations</navtitle></titlealts> + <prolog> + <metadata> + <data name="Category" value="Performance"/> + <data name="Category" value="Impala"/> + <data name="Category" value="Planning"/> + <data name="Category" value="Querying"/> + <data name="Category" value="Developers"/> + <data name="Category" value="Memory"/> + <data name="Category" value="Scalability"/> + <!-- Using domain knowledge about Impala, sizing, etc. to decide what to mark as 'Proof of Concept'. --> + <data name="Category" value="Proof of Concept"/> + </metadata> + </prolog> + + <conbody> + + <p> + This section explains how the size of your cluster and the volume of data influences SQL performance and + schema design for Impala tables. Typically, adding more cluster capacity reduces problems due to memory + limits or disk throughput. On the other hand, larger clusters are more likely to have other kinds of + scalability issues, such as a single slow node that causes performance problems for queries. + </p> + + <p outputclass="toc inpage"/> + + <p conref="../shared/impala_common.xml#common/cookbook_blurb"/> + + </conbody> + + <concept audience="Cloudera" id="scalability_memory"> + + <title>Overview and Guidelines for Impala Memory Usage</title> + <prolog> + <metadata> + <data name="Category" value="Memory"/> + <data name="Category" value="Concepts"/> + <data name="Category" value="Best Practices"/> + <data name="Category" value="Guidelines"/> + </metadata> + </prolog> + + <conbody> + +<!-- +Outline adapted from Alan Choi's "best practices" and/or "performance cookbook" papers. +--> + +<codeblock>Memory Usage â the Basics +* Memory is used by: +* Hash join â RHS tables after decompression, filtering and projection +* Group by â proportional to the #groups +* Parquet writer buffer â 1GB per partition +* IO buffer (shared across queries) +* Metadata cache (no more than 1GB typically) +* Memory held and reused by later query +* Impala releases memory from time to time starting in 1.4. + +Memory Usage â Estimating Memory Usage +* Use Explain Plan +* Requires statistics! Mem estimate without stats is meaningless. +* Reports per-host memory requirement for this cluster size. +* Re-run if youâve re-sized the cluster! +[image of explain plan] + +Memory Usage â Estimating Memory Usage +* EXPLAINâs memory estimate issues +* Can be way off â much higher or much lower. +* group byâs estimate can be particularly off â when thereâs a large number of group by columns. +* Mem estimate = NDV of group by column 1 * NDV of group by column 2 * ... NDV of group by column n +* Ignore EXPLAINâs estimate if itâs too high! ⢠Do your own estimate for group by +* GROUP BY mem usage = (total number of groups * size of each row) + (total number of groups * size of each row) / num node + +Memory Usage â Finding Actual Memory Usage +* Search for âPer Node Peak Memory Usageâ in the profile. +This is accurate. Use it for production capacity planning. + +Memory Usage â Actual Memory Usage +* For complex queries, how do I know which part of my query is using too much memory? +* Use the ExecSummary from the query profile! +- But is that "Peak Mem" number aggregate or per-node? +[image of executive summary] + +Memory Usage â Hitting Mem-limit +* Top causes (in order) of hitting mem-limit even when running a single query: +1. Lack of statistics +2. Lots of joins within a single query +3. Big-table joining big-table +4. Gigantic group by + +Memory Usage â Hitting Mem-limit +Lack of stats +* Wrong join order, wrong join strategy, wrong insert strategy +* Explain Plan tells you that! +[image of explain plan] +* Fix: Compute Stats table + +Memory Usage â Hitting Mem-limit +Lots of joins within a single query +* select...from fact, dim1, dim2,dim3,...dimN where ... +* Each dim tbl can fit in memory, but not all of them together +* As of Impala 1.4, Impala might choose the wrong plan â BROADCAST +FIX 1: use shuffle hint +select ... from fact join [shuffle] dim1 on ... join dim2 [shuffle] ... +FIX 2: pre-join the dim tables (if possible) +- How about an example to illustrate that technique? +* few join=>better perf! + +Memory Usage: Hitting Mem-limit +Big-table joining big-table +* Big-table (after decompression, filtering, and projection) is a table that is bigger than total cluster memory size. +* Impala 2.0 will do this (via disk-based join). Consider using Hive for now. +* (Advanced) For a simple query, you can try this advanced workaround â per-partition join +* Requires the partition key be part of the join key +select ... from BigTbl_A a join BigTbl_B b where a.part_key = b.part_key and a.part_key in (1,2,3) + union all +select ... from BigTbl_A a join BigTbl_B b where a.part_key = b.part_key and a.part_key in (4,5,6) + +Memory Usage: Hitting Mem-limit +Gigantic group by +* The total number of distinct groups is huge, such as group by userid. +* Impala 2.0 will do this (via disk-based agg). Consider using Hive for now. +- Is this one of the cases where people were unhappy we recommended Hive? +* (Advanced) For a simple query, you can try this advanced workaround â per-partition agg +* Requires the partition key be part of the group by +select part_key, col1, col2, ...agg(..) from tbl where + part_key in (1,2,3) + Union all + Select part_key, col1, col2, ...agg(..) from tbl where + part_key in (4,5,6) +- But where's the GROUP BY in the preceding query? Need a real example. + +Memory Usage: Additional Notes +* Use explain plan for estimate; use profile for accurate measure +* Data skew can use uneven memory usage +* Review previous common issues on out-of-memory +* Note: Even with disk-based joins, you'll want to review these steps to speed up queries and use memory more efficiently +</codeblock> + </conbody> + </concept> + + <concept id="scalability_catalog"> + + <title>Impact of Many Tables or Partitions on Impala Catalog Performance and Memory Usage</title> + + <conbody> + + <p audience="Cloudera"> + Details to fill in in future: Impact of <q>load catalog in background</q> option. + Changing timeouts. Related Cloudera Manager settings. + </p> + + <p> + Because Hadoop I/O is optimized for reading and writing large files, Impala is optimized for tables + containing relatively few, large data files. Schemas containing thousands of tables, or tables containing + thousands of partitions, can encounter performance issues during startup or during DDL operations such as + <codeph>ALTER TABLE</codeph> statements. + </p> + + <note type="important" rev="TSB-168"> + <p> + Because of a change in the default heap size for the <cmdname>catalogd</cmdname> daemon in + <keyword keyref="impala25_full"/> and higher, the following procedure to increase the <cmdname>catalogd</cmdname> + memory limit might be required following an upgrade to <keyword keyref="impala25_full"/> even if not + needed previously. + </p> + </note> + + <p conref="../shared/impala_common.xml#common/increase_catalogd_heap_size"/> + + </conbody> + </concept> + + <concept rev="2.1.0" id="statestore_scalability"> + + <title>Scalability Considerations for the Impala Statestore</title> + + <conbody> + + <p> + Before CDH 5.3, the statestore sent only one kind of message to its subscribers. This message contained all + updates for any topics that a subscriber had subscribed to. It also served to let subscribers know that the + statestore had not failed, and conversely the statestore used the success of sending a heartbeat to a + subscriber to decide whether or not the subscriber had failed. + </p> + + <p> + Combining topic updates and failure detection in a single message led to bottlenecks in clusters with large + numbers of tables, partitions, and HDFS data blocks. When the statestore was overloaded with metadata + updates to transmit, heartbeat messages were sent less frequently, sometimes causing subscribers to time + out their connection with the statestore. Increasing the subscriber timeout and decreasing the frequency of + statestore heartbeats worked around the problem, but reduced responsiveness when the statestore failed or + restarted. + </p> + + <p> + As of CDH 5.3, the statestore now sends topic updates and heartbeats in separate messages. This allows the + statestore to send and receive a steady stream of lightweight heartbeats, and removes the requirement to + send topic updates according to a fixed schedule, reducing statestore network overhead. + </p> + + <p> + The statestore now has the following relevant configuration flags for the <cmdname>statestored</cmdname> + daemon: + </p> + + <dl> + <dlentry id="statestore_num_update_threads"> + + <dt> + <codeph>-statestore_num_update_threads</codeph> + </dt> + + <dd> + The number of threads inside the statestore dedicated to sending topic updates. You should not + typically need to change this value. + <p> + <b>Default:</b> 10 + </p> + </dd> + + </dlentry> + + <dlentry id="statestore_update_frequency_ms"> + + <dt> + <codeph>-statestore_update_frequency_ms</codeph> + </dt> + + <dd> + The frequency, in milliseconds, with which the statestore tries to send topic updates to each + subscriber. This is a best-effort value; if the statestore is unable to meet this frequency, it sends + topic updates as fast as it can. You should not typically need to change this value. + <p> + <b>Default:</b> 2000 + </p> + </dd> + + </dlentry> + + <dlentry id="statestore_num_heartbeat_threads"> + + <dt> + <codeph>-statestore_num_heartbeat_threads</codeph> + </dt> + + <dd> + The number of threads inside the statestore dedicated to sending heartbeats. You should not typically + need to change this value. + <p> + <b>Default:</b> 10 + </p> + </dd> + + </dlentry> + + <dlentry id="statestore_heartbeat_frequency_ms"> + + <dt> + <codeph>-statestore_heartbeat_frequency_ms</codeph> + </dt> + + <dd> + The frequency, in milliseconds, with which the statestore tries to send heartbeats to each subscriber. + This value should be good for large catalogs and clusters up to approximately 150 nodes. Beyond that, + you might need to increase this value to make the interval longer between heartbeat messages. + <p> + <b>Default:</b> 1000 (one heartbeat message every second) + </p> + </dd> + + </dlentry> + </dl> + + <p> + As of CDH 5.3, not all of these flags are present in the Cloudera Manager user interface. Some must be set + using the <uicontrol>Advanced Configuration Snippet</uicontrol> fields for the statestore component. + </p> + + <p> + If it takes a very long time for a cluster to start up, and <cmdname>impala-shell</cmdname> consistently + displays <codeph>This Impala daemon is not ready to accept user requests</codeph>, the statestore might be + taking too long to send the entire catalog topic to the cluster. In this case, consider adding + <codeph>--load_catalog_in_background=false</codeph> to your catalog service configuration. This setting + stops the statestore from loading the entire catalog into memory at cluster startup. Instead, metadata for + each table is loaded when the table is accessed for the first time. + </p> + </conbody> + </concept> + + <concept audience="Cloudera" id="scalability_cluster_size"> + + <title>Scalability Considerations for Impala Cluster Size and Topology</title> + + <conbody> + + <p> + </p> + </conbody> + </concept> + + <concept audience="Cloudera" id="concurrent_connections"> + + <title>Scaling the Number of Concurrent Connections</title> + + <conbody> + + <p></p> + </conbody> + </concept> + + <concept rev="2.0.0" id="spill_to_disk"> + + <title>SQL Operations that Spill to Disk</title> + + <conbody> + + <p> + Certain memory-intensive operations write temporary data to disk (known as <term>spilling</term> to disk) + when Impala is close to exceeding its memory limit on a particular host. + </p> + + <p> + The result is a query that completes successfully, rather than failing with an out-of-memory error. The + tradeoff is decreased performance due to the extra disk I/O to write the temporary data and read it back + in. The slowdown could be potentially be significant. Thus, while this feature improves reliability, + you should optimize your queries, system parameters, and hardware configuration to make this spilling a rare occurrence. + </p> + + <p> + <b>What kinds of queries might spill to disk:</b> + </p> + + <p> + Several SQL clauses and constructs require memory allocations that could activat the spilling mechanism: + </p> + <ul> + <li> + <p> + when a query uses a <codeph>GROUP BY</codeph> clause for columns + with millions or billions of distinct values, Impala keeps a + similar number of temporary results in memory, to accumulate the + aggregate results for each value in the group. + </p> + </li> + <li> + <p> + When large tables are joined together, Impala keeps the values of + the join columns from one table in memory, to compare them to + incoming values from the other table. + </p> + </li> + <li> + <p> + When a large result set is sorted by the <codeph>ORDER BY</codeph> + clause, each node sorts its portion of the result set in memory. + </p> + </li> + <li> + <p> + The <codeph>DISTINCT</codeph> and <codeph>UNION</codeph> operators + build in-memory data structures to represent all values found so + far, to eliminate duplicates as the query progresses. + </p> + </li> + <!-- JIRA still in open state as of 5.8 / 2.6, commenting out. + <li> + <p rev="IMPALA-3471"> + In <keyword keyref="impala26_full"/> and higher, <term>top-N</term> queries (those with + <codeph>ORDER BY</codeph> and <codeph>LIMIT</codeph> clauses) can also spill. + Impala allocates enough memory to hold as many rows as specified by the <codeph>LIMIT</codeph> + clause, plus enough memory to hold as many rows as specified by any <codeph>OFFSET</codeph> clause. + </p> + </li> + --> + </ul> + + <p conref="../shared/impala_common.xml#common/spill_to_disk_vs_dynamic_partition_pruning"/> + + <p> + <b>How Impala handles scratch disk space for spilling:</b> + </p> + + <p rev="obwl" conref="../shared/impala_common.xml#common/order_by_scratch_dir"/> + + <p> + <b>Memory usage for SQL operators:</b> + </p> + + <p> + The infrastructure of the spilling feature affects the way the affected SQL operators, such as + <codeph>GROUP BY</codeph>, <codeph>DISTINCT</codeph>, and joins, use memory. + On each host that participates in the query, each such operator in a query accumulates memory + while building the data structure to process the aggregation or join operation. The amount + of memory used depends on the portion of the data being handled by that host, and thus might + be different from one host to another. When the amount of memory being used for the operator + on a particular host reaches a threshold amount, Impala reserves an additional memory buffer + to use as a work area in case that operator causes the query to exceed the memory limit for + that host. After allocating the memory buffer, the memory used by that operator remains + essentially stable or grows only slowly, until the point where the memory limit is reached + and the query begins writing temporary data to disk. + </p> + + <p rev="2.2.0"> + Prior to Impala 2.2 (CDH 5.4), the extra memory buffer for an operator that might spill to disk + was allocated when the data structure used by the applicable SQL operator reaches 16 MB in size, + and the memory buffer itself was 512 MB. In Impala 2.2, these values are halved: the threshold value + is 8 MB and the memory buffer is 256 MB. <ph rev="2.3.0">In <keyword keyref="impala23_full"/> and higher, the memory for the buffer + is allocated in pieces, only as needed, to avoid sudden large jumps in memory usage.</ph> A query that uses + multiple such operators might allocate multiple such memory buffers, as the size of the data structure + for each operator crosses the threshold on a particular host. + </p> + + <p> + Therefore, a query that processes a relatively small amount of data on each host would likely + never reach the threshold for any operator, and would never allocate any extra memory buffers. A query + that did process millions of groups, distinct values, join keys, and so on might cross the threshold, + causing its memory requirement to rise suddenly and then flatten out. The larger the cluster, less data is processed + on any particular host, thus reducing the chance of requiring the extra memory allocation. + </p> + + <p> + <b>Added in:</b> This feature was added to the <codeph>ORDER BY</codeph> clause in Impala 1.4 for CDH 4, + and in CDH 5.1. This feature was extended to cover join queries, aggregation functions, and analytic + functions in Impala 2.0 for CDH 4, and in CDH 5.2. The size of the memory work area required by + each operator that spills was reduced from 512 megabytes to 256 megabytes in Impala 2.2 (CDH 5.4). + </p> + + <p> + <b>Avoiding queries that spill to disk:</b> + </p> + + <p> + Because the extra I/O can impose significant performance overhead on these types of queries, try to avoid + this situation by using the following steps: + </p> + + <ol> + <li> + Detect how often queries spill to disk, and how much temporary data is written. Refer to the following + sources: + <ul> + <li> + The output of the <codeph>PROFILE</codeph> command in the <cmdname>impala-shell</cmdname> + interpreter. This data shows the memory usage for each host and in total across the cluster. The + <codeph>BlockMgr.BytesWritten</codeph> counter reports how much data was written to disk during the + query. + </li> + + <li> + The <uicontrol>Impala Queries</uicontrol> dialog in Cloudera Manager. You can see the peak memory + usage for a query, combined across all nodes in the cluster. + </li> + + <li> + The <uicontrol>Queries</uicontrol> tab in the Impala debug web user interface. Select the query to + examine and click the corresponding <uicontrol>Profile</uicontrol> link. This data breaks down the + memory usage for a single host within the cluster, the host whose web interface you are connected to. + </li> + </ul> + </li> + + <li> + Use one or more techniques to reduce the possibility of the queries spilling to disk: + <ul> + <li> + Increase the Impala memory limit if practical, for example, if you can increase the available memory + by more than the amount of temporary data written to disk on a particular node. Remember that in + Impala 2.0 and later, you can issue <codeph>SET MEM_LIMIT</codeph> as a SQL statement, which lets you + fine-tune the memory usage for queries from JDBC and ODBC applications. + </li> + + <li> + Increase the number of nodes in the cluster, to increase the aggregate memory available to Impala and + reduce the amount of memory required on each node. + </li> + + <li> + Increase the overall memory capacity of each DataNode at the hardware level. + </li> + + <li> + On a cluster with resources shared between Impala and other Hadoop components, use resource + management features to allocate more memory for Impala. See + <xref href="impala_resource_management.xml#resource_management"/> for details. + </li> + + <li> + If the memory pressure is due to running many concurrent queries rather than a few memory-intensive + ones, consider using the Impala admission control feature to lower the limit on the number of + concurrent queries. By spacing out the most resource-intensive queries, you can avoid spikes in + memory usage and improve overall response times. See + <xref href="impala_admission.xml#admission_control"/> for details. + </li> + + <li> + Tune the queries with the highest memory requirements, using one or more of the following techniques: + <ul> + <li> + Run the <codeph>COMPUTE STATS</codeph> statement for all tables involved in large-scale joins and + aggregation queries. + </li> + + <li> + Minimize your use of <codeph>STRING</codeph> columns in join columns. Prefer numeric values + instead. + </li> + + <li> + Examine the <codeph>EXPLAIN</codeph> plan to understand the execution strategy being used for the + most resource-intensive queries. See <xref href="impala_explain_plan.xml#perf_explain"/> for + details. + </li> + + <li> + If Impala still chooses a suboptimal execution strategy even with statistics available, or if it + is impractical to keep the statistics up to date for huge or rapidly changing tables, add hints + to the most resource-intensive queries to select the right execution strategy. See + <xref href="impala_hints.xml#hints"/> for details. + </li> + </ul> + </li> + + <li> + If your queries experience substantial performance overhead due to spilling, enable the + <codeph>DISABLE_UNSAFE_SPILLS</codeph> query option. This option prevents queries whose memory usage + is likely to be exorbitant from spilling to disk. See + <xref href="impala_disable_unsafe_spills.xml#disable_unsafe_spills"/> for details. As you tune + problematic queries using the preceding steps, fewer and fewer will be cancelled by this option + setting. + </li> + </ul> + </li> + </ol> + + <p> + <b>Testing performance implications of spilling to disk:</b> + </p> + + <p> + To artificially provoke spilling, to test this feature and understand the performance implications, use a + test environment with a memory limit of at least 2 GB. Issue the <codeph>SET</codeph> command with no + arguments to check the current setting for the <codeph>MEM_LIMIT</codeph> query option. Set the query + option <codeph>DISABLE_UNSAFE_SPILLS=true</codeph>. This option limits the spill-to-disk feature to prevent + runaway disk usage from queries that are known in advance to be suboptimal. Within + <cmdname>impala-shell</cmdname>, run a query that you expect to be memory-intensive, based on the criteria + explained earlier. A self-join of a large table is a good candidate: + </p> + +<codeblock>select count(*) from big_table a join big_table b using (column_with_many_values); +</codeblock> + + <p> + Issue the <codeph>PROFILE</codeph> command to get a detailed breakdown of the memory usage on each node + during the query. The crucial part of the profile output concerning memory is the <codeph>BlockMgr</codeph> + portion. For example, this profile shows that the query did not quite exceed the memory limit. + </p> + +<codeblock>BlockMgr: + - BlockWritesIssued: 1 + - BlockWritesOutstanding: 0 + - BlocksCreated: 24 + - BlocksRecycled: 1 + - BufferedPins: 0 + - MaxBlockSize: 8.00 MB (8388608) + <b>- MemoryLimit: 200.00 MB (209715200)</b> + <b>- PeakMemoryUsage: 192.22 MB (201555968)</b> + - TotalBufferWaitTime: 0ns + - TotalEncryptionTime: 0ns + - TotalIntegrityCheckTime: 0ns + - TotalReadBlockTime: 0ns +</codeblock> + + <p> + In this case, because the memory limit was already below any recommended value, I increased the volume of + data for the query rather than reducing the memory limit any further. + </p> + + <p> + Set the <codeph>MEM_LIMIT</codeph> query option to a value that is smaller than the peak memory usage + reported in the profile output. Do not specify a memory limit lower than about 300 MB, because with such a + low limit, queries could fail to start for other reasons. Now try the memory-intensive query again. + </p> + + <p> + Check if the query fails with a message like the following: + </p> + +<codeblock>WARNINGS: Spilling has been disabled for plans that do not have stats and are not hinted +to prevent potentially bad plans from using too many cluster resources. Compute stats on +these tables, hint the plan or disable this behavior via query options to enable spilling. +</codeblock> + + <p> + If so, the query could have consumed substantial temporary disk space, slowing down so much that it would + not complete in any reasonable time. Rather than rely on the spill-to-disk feature in this case, issue the + <codeph>COMPUTE STATS</codeph> statement for the table or tables in your sample query. Then run the query + again, check the peak memory usage again in the <codeph>PROFILE</codeph> output, and adjust the memory + limit again if necessary to be lower than the peak memory usage. + </p> + + <p> + At this point, you have a query that is memory-intensive, but Impala can optimize it efficiently so that + the memory usage is not exorbitant. You have set an artificial constraint through the + <codeph>MEM_LIMIT</codeph> option so that the query would normally fail with an out-of-memory error. But + the automatic spill-to-disk feature means that the query should actually succeed, at the expense of some + extra disk I/O to read and write temporary work data. + </p> + + <p> + Try the query again, and confirm that it succeeds. Examine the <codeph>PROFILE</codeph> output again. This + time, look for lines of this form: + </p> + +<codeblock>- SpilledPartitions: <varname>N</varname> +</codeblock> + + <p> + If you see any such lines with <varname>N</varname> greater than 0, that indicates the query would have + failed in Impala releases prior to 2.0, but now it succeeded because of the spill-to-disk feature. Examine + the total time taken by the <codeph>AGGREGATION_NODE</codeph> or other query fragments containing non-zero + <codeph>SpilledPartitions</codeph> values. Compare the times to similar fragments that did not spill, for + example in the <codeph>PROFILE</codeph> output when the same query is run with a higher memory limit. This + gives you an idea of the performance penalty of the spill operation for a particular query with a + particular memory limit. If you make the memory limit just a little lower than the peak memory usage, the + query only needs to write a small amount of temporary data to disk. The lower you set the memory limit, the + more temporary data is written and the slower the query becomes. + </p> + + <p> + Now repeat this procedure for actual queries used in your environment. Use the + <codeph>DISABLE_UNSAFE_SPILLS</codeph> setting to identify cases where queries used more memory than + necessary due to lack of statistics on the relevant tables and columns, and issue <codeph>COMPUTE + STATS</codeph> where necessary. + </p> + + <p> + <b>When to use DISABLE_UNSAFE_SPILLS:</b> + </p> + + <p> + You might wonder, why not leave <codeph>DISABLE_UNSAFE_SPILLS</codeph> turned on all the time. Whether and + how frequently to use this option depends on your system environment and workload. + </p> + + <p> + <codeph>DISABLE_UNSAFE_SPILLS</codeph> is suitable for an environment with ad hoc queries whose performance + characteristics and memory usage are not known in advance. It prevents <q>worst-case scenario</q> queries + that use large amounts of memory unnecessarily. Thus, you might turn this option on within a session while + developing new SQL code, even though it is turned off for existing applications. + </p> + + <p> + Organizations where table and column statistics are generally up-to-date might leave this option turned on + all the time, again to avoid worst-case scenarios for untested queries or if a problem in the ETL pipeline + results in a table with no statistics. Turning on <codeph>DISABLE_UNSAFE_SPILLS</codeph> lets you <q>fail + fast</q> in this case and immediately gather statistics or tune the problematic queries. + </p> + + <p> + Some organizations might leave this option turned off. For example, you might have tables large enough that + the <codeph>COMPUTE STATS</codeph> takes substantial time to run, making it impractical to re-run after + loading new data. If you have examined the <codeph>EXPLAIN</codeph> plans of your queries and know that + they are operating efficiently, you might leave <codeph>DISABLE_UNSAFE_SPILLS</codeph> turned off. In that + case, you know that any queries that spill will not go overboard with their memory consumption. + </p> + +<!-- + <p> + <b>Turning off the spill-to-disk feature: (<keyword keyref="impala24_full"/> and lower only)</b> + </p> + + <p> + Prior to <keyword keyref="impala25_full"/> certain conditions... + </p> + + <p> + You might turn off the spill-to-disk feature if you are in an environment with constraints on disk space, + or if you prefer for queries that exceed the memory capacity in your cluster to <q>fail fast</q> so that + you can tune and retry them. + </p> + + <p> + To turn off this feature, set the following configuration options for each <cmdname>impalad</cmdname> + daemon, either through the <cmdname>impalad</cmdname> advanced configuration snippet in Cloudera Manager, + or during <cmdname>impalad</cmdname> startup on each DataNode on systems not managed by Cloudera Manager: + </p> + +<codeblock>−−enable_partitioned_aggregation=false +−−enable_partitioned_hash_join=false +</codeblock> +--> + + </conbody> + </concept> + +<concept id="complex_query"> +<title>Limits on Query Size and Complexity</title> +<conbody> +<p> +There are hardcoded limits on the maximum size and complexity of queries. +Currently, the maximum number of expressions in a query is 2000. +You might exceed the limits with large or deeply nested queries +produced by business intelligence tools or other query generators. +</p> +<p> +If you have the ability to customize such queries or the query generation +logic that produces them, replace sequences of repetitive expressions +with single operators such as <codeph>IN</codeph> or <codeph>BETWEEN</codeph> +that can represent multiple values or ranges. +For example, instead of a large number of <codeph>OR</codeph> clauses: +</p> +<codeblock>WHERE val = 1 OR val = 2 OR val = 6 OR val = 100 ... +</codeblock> +<p> +use a single <codeph>IN</codeph> clause: +</p> +<codeblock>WHERE val IN (1,2,6,100,...)</codeblock> +</conbody> +</concept> + +<concept id="scalability_io"> +<title>Scalability Considerations for Impala I/O</title> +<conbody> +<p> +Impala parallelizes its I/O operations aggressively, +therefore the more disks you can attach to each host, the better. +Impala retrieves data from disk so quickly using +bulk read operations on large blocks, that most queries +are CPU-bound rather than I/O-bound. +</p> +<p> +Because the kind of sequential scanning typically done by +Impala queries does not benefit much from the random-access +capabilities of SSDs, spinning disks typically provide +the most cost-effective kind of storage for Impala data, +with little or no performance penalty as compared to SSDs. +</p> +<p> +Resource management features such as YARN, Llama, and admission control +typically constrain the amount of memory, CPU, or overall number of +queries in a high-concurrency environment. +Currently, there is no throttling mechanism for Impala I/O. +</p> +</conbody> +</concept> + +<concept id="big_tables"> +<title>Scalability Considerations for Table Layout</title> +<conbody> +<p> +Due to the overhead of retrieving and updating table metadata +in the metastore database, try to limit the number of columns +in a table to a maximum of approximately 2000. +Although Impala can handle wider tables than this, the metastore overhead +can become significant, leading to query performance that is slower +than expected based on the actual data volume. +</p> +<p> +To minimize overhead related to the metastore database and Impala query planning, +try to limit the number of partitions for any partitioned table to a few tens of thousands. +</p> +</conbody> +</concept> + +<concept rev="CDH-38321" id="kerberos_overhead_cluster_size"> +<title>Kerberos-Related Network Overhead for Large Clusters</title> +<conbody> +<p> +When Impala starts up, or after each <codeph>kinit</codeph> refresh, Impala sends a number of +simultaneous requests to the KDC. For a cluster with 100 hosts, the KDC might be able to process +all the requests within roughly 5 seconds. For a cluster with 1000 hosts, the time to process +the requests would be roughly 500 seconds. Impala also makes a number of DNS requests at the same +time as these Kerberos-related requests. +</p> +<p> +While these authentication requests are being processed, any submitted Impala queries will fail. +During this period, the KDC and DNS may be slow to respond to requests from components other than Impala, +so other secure services might be affected temporarily. +</p> +<p> +To reduce the frequency of the <codeph>kinit</codeph> renewal that initiates a new set of +authentication requests, increase the <codeph>kerberos_reinit_interval</codeph> configuration setting +for the <cmdname>impalad</cmdname> daemons. Currently, the default for a cluster not managed by +Cloudera Manager is 60 minutes, while the default under Cloudera Manager is 10 minutes. +Consider using a higher value such as 360 (6 hours). +</p> +</conbody> +</concept> + + <concept id="scalability_hotspots" rev="2.5.0 IMPALA-2696"> + <title>Avoiding CPU Hotspots for HDFS Cached Data</title> + <conbody> + <p> + You can use the HDFS caching feature, described in <xref href="impala_perf_hdfs_caching.xml#hdfs_caching"/>, + with Impala to reduce I/O and memory-to-memory copying for frequently accessed tables or partitions. + </p> + <p> + In the early days of this feature, you might have found that enabling HDFS caching + resulted in little or no performance improvement, because it could result in + <q>hotspots</q>: instead of the I/O to read the table data being parallelized across + the cluster, the I/O was reduced but the CPU load to process the data blocks + might be concentrated on a single host. + </p> + <p> + To avoid hotspots, include the <codeph>WITH REPLICATION</codeph> clause with the + <codeph>CREATE TABLE</codeph> or <codeph>ALTER TABLE</codeph> statements for tables that use HDFS caching. + This clause allows more than one host to cache the relevant data blocks, so the CPU load + can be shared, reducing the load on any one host. + See <xref href="impala_create_table.xml#create_table"/> and <xref href="impala_alter_table.xml#alter_table"/> + for details. + </p> + <p> + Hotspots with high CPU load for HDFS cached data could still arise in some cases, due to + the way that Impala schedules the work of processing data blocks on different hosts. + In <keyword keyref="impala25_full"/> and higher, scheduling improvements mean that the work for + HDFS cached data is divided better among all the hosts that have cached replicas + for a particular data block. When more than one host has a cached replica for a data block, + Impala assigns the work of processing that block to whichever host has done the least work + (in terms of number of bytes read) for the current query. If hotspots persist even with this + load-based scheduling algorithm, you can enable the query option <codeph>SCHEDULE_RANDOM_REPLICA=TRUE</codeph> + to further distribute the CPU load. This setting causes Impala to randomly pick a host to process a cached + data block if the scheduling algorithm encounters a tie when deciding which host has done the + least work. + </p> + </conbody> + </concept> + +</concept> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be0f122/docs/topics/impala_scan_node_codegen_threshold.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_scan_node_codegen_threshold.xml b/docs/topics/impala_scan_node_codegen_threshold.xml new file mode 100644 index 0000000..c81bbc6 --- /dev/null +++ b/docs/topics/impala_scan_node_codegen_threshold.xml @@ -0,0 +1,75 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd"> +<concept id="scan_node_codegen_threshold" rev="2.5.0 IMPALA-1755"> + + <title>SCAN_NODE_CODEGEN_THRESHOLD Query Option (<keyword keyref="impala25"/> or higher only)</title> + <titlealts audience="PDF"><navtitle></navtitle></titlealts> + <prolog> + <metadata> + <data name="Category" value="Impala"/> + <data name="Category" value="Impala Query Options"/> + <data name="Category" value="Performance"/> + <data name="Category" value="Developers"/> + <data name="Category" value="Data Analysts"/> + </metadata> + </prolog> + + <conbody> + + <p rev="2.5.0 IMPALA-1755"> + <indexterm audience="Cloudera">SCAN_NODE_CODEGEN_THRESHOLD query option</indexterm> + The <codeph>SCAN_NODE_CODEGEN_THRESHOLD</codeph> query option + adjusts the aggressiveness of the code generation optimization process + when performing I/O read operations. It can help to work around performance problems + for queries where the table is small and the <codeph>WHERE</codeph> clause is complicated. + </p> + + <p conref="../shared/impala_common.xml#common/type_integer"/> + + <p> + <b>Default:</b> 1800000 (1.8 million) + </p> + + <p conref="../shared/impala_common.xml#common/added_in_250"/> + + <p conref="../shared/impala_common.xml#common/usage_notes_blurb"/> + + <p> + This query option is intended mainly for the case where a query with a very complicated + <codeph>WHERE</codeph> clause, such as an <codeph>IN</codeph> operator with thousands + of entries, is run against a small table, especially a small table using Parquet format. + The code generation phase can become the dominant factor in the query response time, + making the query take several seconds even though there is relatively little work to do. + In this case, increase the value of this option to a much larger amount, anything up to + the maximum for a 32-bit integer. + </p> + + <p> + Because this option only affects the code generation phase for the portion of the + query that performs I/O (the <term>scan nodes</term> within the query plan), it + lets you continue to keep code generation enabled for other queries, and other parts + of the same query, that can benefit from it. In contrast, the + <codeph>IMPALA_DISABLE_CODEGEN</codeph> query option turns off code generation entirely. + </p> + + <p> + Because of the way the work for queries is divided internally, this option might not + affect code generation for all kinds of queries. If a plan fragment contains a scan + node and some other kind of plan node, code generation still occurs regardless of + this option setting. + </p> + + <p> + To use this option effectively, you should be familiar with reading query profile output + to determine the proportion of time spent in the code generation phase, and whether + code generation is enabled or not for specific plan fragments. + </p> + +<!-- + <p conref="../shared/impala_common.xml#common/related_info"/> + <p> + </p> +--> + + </conbody> +</concept>
