http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3c2c8f12/docs/topics/impala_parquet.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_parquet.xml b/docs/topics/impala_parquet.xml index d068bea..7e7516e 100644 --- a/docs/topics/impala_parquet.xml +++ b/docs/topics/impala_parquet.xml @@ -4,7 +4,17 @@ <title>Using the Parquet File Format with Impala Tables</title> <titlealts audience="PDF"><navtitle>Parquet Data Files</navtitle></titlealts> - + <prolog> + <metadata> + <data name="Category" value="Impala"/> + <data name="Category" value="File Formats"/> + <data name="Category" value="Parquet"/> + <data name="Category" value="Developers"/> + <data name="Category" value="Data Analysts"/> + <data name="Category" value="Tables"/> + <data name="Category" value="Schemas"/> + </metadata> + </prolog> <conbody> @@ -20,10 +30,1130 @@ against a Parquet table can retrieve and analyze these values from any column quickly and with minimal I/O. </p> - + <table> + <title>Parquet Format Support in Impala</title> + <tgroup cols="5"> + <colspec colname="1" colwidth="10*"/> + <colspec colname="2" colwidth="10*"/> + <colspec colname="3" colwidth="20*"/> + <colspec colname="4" colwidth="30*"/> + <colspec colname="5" colwidth="30*"/> + <thead> + <row> + <entry> + File Type + </entry> + <entry> + Format + </entry> + <entry> + Compression Codecs + </entry> + <entry> + Impala Can CREATE? + </entry> + <entry> + Impala Can INSERT? + </entry> + </row> + </thead> + <tbody> + <row conref="impala_file_formats.xml#file_formats/parquet_support"> + <entry/> + </row> + </tbody> + </tgroup> + </table> + + <p outputclass="toc inpage"/> + + <p audience="integrated"> + For general information about using Parquet with other CDH components, + see <xref href="cdh_ig_parquet.xml#parquet_format"/>. + </p> + + </conbody> + + + <concept id="parquet_ddl"> + + <title>Creating Parquet Tables in Impala</title> + + <conbody> + + <p> + To create a table named <codeph>PARQUET_TABLE</codeph> that uses the Parquet format, you would use a + command like the following, substituting your own table name, column names, and data types: + </p> + +<codeblock>[impala-host:21000] > create table <varname>parquet_table_name</varname> (x INT, y STRING) STORED AS PARQUET;</codeblock> + +<!-- +<note> +Formerly, the <codeph>STORED AS</codeph> clause required the keyword <codeph>PARQUETFILE</codeph>. +In Impala 1.2.2 and higher, you can use <codeph>STORED AS PARQUET</codeph>. +This <codeph>PARQUET</codeph> keyword is recommended for new code. +</note> +--> + + <p> + Or, to clone the column names and data types of an existing table: + </p> + +<codeblock>[impala-host:21000] > create table <varname>parquet_table_name</varname> LIKE <varname>other_table_name</varname> STORED AS PARQUET;</codeblock> + + <p rev="1.4.0"> + In Impala 1.4.0 and higher, you can derive column definitions from a raw Parquet data file, even without an + existing Impala table. For example, you can create an external table pointing to an HDFS directory, and + base the column definitions on one of the files in that directory: + </p> + +<codeblock rev="1.4.0">CREATE EXTERNAL TABLE ingest_existing_files LIKE PARQUET '/user/etl/destination/datafile1.dat' + STORED AS PARQUET + LOCATION '/user/etl/destination'; +</codeblock> + + <p> + Or, you can refer to an existing data file and create a new empty table with suitable column definitions. + Then you can use <codeph>INSERT</codeph> to create new data files or <codeph>LOAD DATA</codeph> to transfer + existing data files into the new table. + </p> + +<codeblock rev="1.4.0">CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat' + STORED AS PARQUET; +</codeblock> + + <p> + The default properties of the newly created table are the same as for any other <codeph>CREATE + TABLE</codeph> statement. For example, the default file format is text; if you want the new table to use + the Parquet file format, include the <codeph>STORED AS PARQUET</codeph> file also. + </p> + + <p> + In this example, the new table is partitioned by year, month, and day. These partition key columns are not + part of the data file, so you specify them in the <codeph>CREATE TABLE</codeph> statement: + </p> + +<codeblock rev="1.4.0">CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat' + PARTITION (year INT, month TINYINT, day TINYINT) + STORED AS PARQUET; +</codeblock> + + <p rev="1.4.0"> + See <xref href="impala_create_table.xml#create_table"/> for more details about the <codeph>CREATE TABLE + LIKE PARQUET</codeph> syntax. + </p> + + <p> + Once you have created a table, to insert data into that table, use a command similar to the following, + again with your own table names: + </p> + + <!-- To do: + Opportunity for another example showing CTAS technique. + --> + +<codeblock>[impala-host:21000] > insert overwrite table <varname>parquet_table_name</varname> select * from <varname>other_table_name</varname>;</codeblock> + + <p> + If the Parquet table has a different number of columns or different column names than the other table, + specify the names of columns from the other table rather than <codeph>*</codeph> in the + <codeph>SELECT</codeph> statement. + </p> + + </conbody> + + </concept> + + <concept id="parquet_etl"> + + <title>Loading Data into Parquet Tables</title> + <prolog> + <metadata> + <data name="Category" value="ETL"/> + </metadata> + </prolog> + + <conbody> + + <p> + Choose from the following techniques for loading data into Parquet tables, depending on whether the + original data is already in an Impala table, or exists as raw data files outside Impala. + </p> + + <p> + If you already have data in an Impala or Hive table, perhaps in a different file format or partitioning + scheme, you can transfer the data to a Parquet table using the Impala <codeph>INSERT...SELECT</codeph> + syntax. You can convert, filter, repartition, and do other things to the data as part of this same + <codeph>INSERT</codeph> statement. See <xref href="#parquet_compression"/> for some examples showing how to + insert data into Parquet tables. + </p> + + <p conref="../shared/impala_common.xml#common/insert_hints"/> + + <p conref="../shared/impala_common.xml#common/insert_parquet_blocksize"/> + + <draft-comment translate="no"> +Add an example here. +</draft-comment> + + <p> + Avoid the <codeph>INSERT...VALUES</codeph> syntax for Parquet tables, because + <codeph>INSERT...VALUES</codeph> produces a separate tiny data file for each + <codeph>INSERT...VALUES</codeph> statement, and the strength of Parquet is in its handling of data + (compressing, parallelizing, and so on) in <ph rev="parquet_block_size">large</ph> chunks. + </p> + + <p> + If you have one or more Parquet data files produced outside of Impala, you can quickly make the data + queryable through Impala by one of the following methods: + </p> + + <ul> + <li> + The <codeph>LOAD DATA</codeph> statement moves a single data file or a directory full of data files into + the data directory for an Impala table. It does no validation or conversion of the data. The original + data files must be somewhere in HDFS, not the local filesystem. + <draft-comment translate="no"> +Add an example here. +</draft-comment> + </li> + + <li> + The <codeph>CREATE TABLE</codeph> statement with the <codeph>LOCATION</codeph> clause creates a table + where the data continues to reside outside the Impala data directory. The original data files must be + somewhere in HDFS, not the local filesystem. For extra safety, if the data is intended to be long-lived + and reused by other applications, you can use the <codeph>CREATE EXTERNAL TABLE</codeph> syntax so that + the data files are not deleted by an Impala <codeph>DROP TABLE</codeph> statement. + <draft-comment translate="no"> +Add an example here. +</draft-comment> + </li> + + <li> + If the Parquet table already exists, you can copy Parquet data files directly into it, then use the + <codeph>REFRESH</codeph> statement to make Impala recognize the newly added data. Remember to preserve + the block size of the Parquet data files by using the <codeph>hadoop distcp -pb</codeph> command rather + than a <codeph>-put</codeph> or <codeph>-cp</codeph> operation on the Parquet files. See + <xref href="#parquet_compression_multiple"/> for an example of this kind of operation. + </li> + </ul> + + <note conref="../shared/impala_common.xml#common/restrictions_nonimpala_parquet"/> + + <p> + Recent versions of Sqoop can produce Parquet output files using the <codeph>--as-parquetfile</codeph> + option. + </p> + + <p conref="../shared/impala_common.xml#common/sqoop_timestamp_caveat"/> + + <p> + If the data exists outside Impala and is in some other format, combine both of the preceding techniques. + First, use a <codeph>LOAD DATA</codeph> or <codeph>CREATE EXTERNAL TABLE ... LOCATION</codeph> statement to + bring the data into an Impala table that uses the appropriate file format. Then, use an + <codeph>INSERT...SELECT</codeph> statement to copy the data to the Parquet table, converting to Parquet + format as part of the process. + </p> + + <draft-comment translate="no"> +Add an example here. +</draft-comment> + + <p> + Loading data into Parquet tables is a memory-intensive operation, because the incoming data is buffered + until it reaches <ph rev="parquet_block_size">one data block</ph> in size, then that chunk of data is + organized and compressed in memory before being written out. The memory consumption can be larger when + inserting data into partitioned Parquet tables, because a separate data file is written for each + combination of partition key column values, potentially requiring several + <ph rev="parquet_block_size">large</ph> chunks to be manipulated in memory at once. + </p> + + <p> + When inserting into a partitioned Parquet table, Impala redistributes the data among the nodes to reduce + memory consumption. You might still need to temporarily increase the memory dedicated to Impala during the + insert operation, or break up the load operation into several <codeph>INSERT</codeph> statements, or both. + </p> + + <note> + All the preceding techniques assume that the data you are loading matches the structure of the destination + table, including column order, column names, and partition layout. To transform or reorganize the data, + start by loading the data into a Parquet table that matches the underlying structure of the data, then use + one of the table-copying techniques such as <codeph>CREATE TABLE AS SELECT</codeph> or <codeph>INSERT ... + SELECT</codeph> to reorder or rename columns, divide the data among multiple partitions, and so on. For + example to take a single comprehensive Parquet data file and load it into a partitioned table, you would + use an <codeph>INSERT ... SELECT</codeph> statement with dynamic partitioning to let Impala create separate + data files with the appropriate partition values; for an example, see + <xref href="impala_insert.xml#insert"/>. + </note> + + </conbody> + + </concept> + + <concept id="parquet_performance"> + + <title>Query Performance for Impala Parquet Tables</title> + <prolog> + <metadata> + <data name="Category" value="Performance"/> + </metadata> + </prolog> + + <conbody> + + <p> + Query performance for Parquet tables depends on the number of columns needed to process the + <codeph>SELECT</codeph> list and <codeph>WHERE</codeph> clauses of the query, the way data is divided into + <ph rev="parquet_block_size">large data files with block size equal to file size</ph>, the reduction in I/O + by reading the data for each column in compressed format, which data files can be skipped (for partitioned + tables), and the CPU overhead of decompressing the data for each column. + </p> + + <p> + For example, the following is an efficient query for a Parquet table: +<codeblock>select avg(income) from census_data where state = 'CA';</codeblock> + The query processes only 2 columns out of a large number of total columns. If the table is partitioned by + the <codeph>STATE</codeph> column, it is even more efficient because the query only has to read and decode + 1 column from each data file, and it can read only the data files in the partition directory for the state + <codeph>'CA'</codeph>, skipping the data files for all the other states, which will be physically located + in other directories. + </p> + + <p> + The following is a relatively inefficient query for a Parquet table: +<codeblock>select * from census_data;</codeblock> + Impala would have to read the entire contents of each <ph rev="parquet_block_size">large</ph> data file, + and decompress the contents of each column for each row group, negating the I/O optimizations of the + column-oriented format. This query might still be faster for a Parquet table than a table with some other + file format, but it does not take advantage of the unique strengths of Parquet data files. + </p> + + <p> + Impala can optimize queries on Parquet tables, especially join queries, better when statistics are + available for all the tables. Issue the <codeph>COMPUTE STATS</codeph> statement for each table after + substantial amounts of data are loaded into or appended to it. See + <xref href="impala_compute_stats.xml#compute_stats"/> for details. + </p> + + <p rev="2.5.0"> + The runtime filtering feature, available in CDH 5.7 / Impala 2.5 and higher, works best with Parquet tables. + The per-row filtering aspect only applies to Parquet tables. + See <xref href="impala_runtime_filtering.xml#runtime_filtering"/> for details. + </p> + + <p conref="../shared/impala_common.xml#common/s3_block_splitting"/> + + </conbody> + + <concept id="parquet_partitioning"> + + <title>Partitioning for Parquet Tables</title> + + <conbody> + + <p> + As explained in <xref href="impala_partitioning.xml#partitioning"/>, partitioning is an important + performance technique for Impala generally. This section explains some of the performance considerations + for partitioned Parquet tables. + </p> + + <p> + The Parquet file format is ideal for tables containing many columns, where most queries only refer to a + small subset of the columns. As explained in <xref href="#parquet_data_files"/>, the physical layout of + Parquet data files lets Impala read only a small fraction of the data for many queries. The performance + benefits of this approach are amplified when you use Parquet tables in combination with partitioning. + Impala can skip the data files for certain partitions entirely, based on the comparisons in the + <codeph>WHERE</codeph> clause that refer to the partition key columns. For example, queries on + partitioned tables often analyze data for time intervals based on columns such as <codeph>YEAR</codeph>, + <codeph>MONTH</codeph>, and/or <codeph>DAY</codeph>, or for geographic regions. Remember that Parquet + data files use a <ph rev="parquet_block_size">large</ph> block size, so when deciding how finely to + partition the data, try to find a granularity where each partition contains + <ph rev="parquet_block_size">256 MB</ph> or more of data, rather than creating a large number of smaller + files split among many partitions. + </p> + + <p> + Inserting into a partitioned Parquet table can be a resource-intensive operation, because each Impala + node could potentially be writing a separate data file to HDFS for each combination of different values + for the partition key columns. The large number of simultaneous open files could exceed the HDFS + <q>transceivers</q> limit. To avoid exceeding this limit, consider the following techniques: + </p> + + <ul> + <li> + Load different subsets of data using separate <codeph>INSERT</codeph> statements with specific values + for the <codeph>PARTITION</codeph> clause, such as <codeph>PARTITION (year=2010)</codeph>. + </li> + + <li> + Increase the <q>transceivers</q> value for HDFS, sometimes spelled <q>xcievers</q> (sic). The property + value in the <filepath>hdfs-site.xml</filepath> configuration file is +<!-- Old name, now deprecated: <codeph>dfs.datanode.max.xcievers</codeph>. --> + <codeph>dfs.datanode.max.transfer.threads</codeph>. For example, if you were loading 12 years of data + partitioned by year, month, and day, even a value of 4096 might not be high enough. This + <xref href="http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/" scope="external" format="html">blog + post</xref> explores the considerations for setting this value higher or lower, using HBase examples + for illustration. + </li> + + <li> + Use the <codeph>COMPUTE STATS</codeph> statement to collect + <xref href="impala_perf_stats.xml#perf_column_stats">column statistics</xref> on the source table from + which data is being copied, so that the Impala query can estimate the number of different values in the + partition key columns and distribute the work accordingly. + </li> + </ul> + + </conbody> + + </concept> + + </concept> + + <concept id="parquet_compression"> + + <title>Snappy and GZip Compression for Parquet Data Files</title> + <prolog> + <metadata> + <data name="Category" value="Snappy"/> + <data name="Category" value="Gzip"/> + <data name="Category" value="Compression"/> + </metadata> + </prolog> + + <conbody> + + <p> + <indexterm audience="Cloudera">COMPRESSION_CODEC query option</indexterm> + When Impala writes Parquet data files using the <codeph>INSERT</codeph> statement, the underlying + compression is controlled by the <codeph>COMPRESSION_CODEC</codeph> query option. (Prior to Impala 2.0, the + query option name was <codeph>PARQUET_COMPRESSION_CODEC</codeph>.) The allowed values for this query option + are <codeph>snappy</codeph> (the default), <codeph>gzip</codeph>, and <codeph>none</codeph>. The option + value is not case-sensitive. If the option is set to an unrecognized value, all kinds of queries will fail + due to the invalid option setting, not just queries involving Parquet tables. + </p> + + </conbody> + + <concept id="parquet_snappy"> + + <title>Example of Parquet Table with Snappy Compression</title> + + <conbody> + + <p> + <indexterm audience="Cloudera">compression</indexterm> + By default, the underlying data files for a Parquet table are compressed with Snappy. The combination of + fast compression and decompression makes it a good choice for many data sets. To ensure Snappy + compression is used, for example after experimenting with other compression codecs, set the + <codeph>COMPRESSION_CODEC</codeph> query option to <codeph>snappy</codeph> before inserting the data: + </p> + +<codeblock>[localhost:21000] > create database parquet_compression; +[localhost:21000] > use parquet_compression; +[localhost:21000] > create table parquet_snappy like raw_text_data; +[localhost:21000] > set COMPRESSION_CODEC=snappy; +[localhost:21000] > insert into parquet_snappy select * from raw_text_data; +Inserted 1000000000 rows in 181.98s +</codeblock> + + </conbody> + + </concept> + + <concept id="parquet_gzip"> + + <title>Example of Parquet Table with GZip Compression</title> + + <conbody> + + <p> + If you need more intensive compression (at the expense of more CPU cycles for uncompressing during + queries), set the <codeph>COMPRESSION_CODEC</codeph> query option to <codeph>gzip</codeph> before + inserting the data: + </p> + +<codeblock>[localhost:21000] > create table parquet_gzip like raw_text_data; +[localhost:21000] > set COMPRESSION_CODEC=gzip; +[localhost:21000] > insert into parquet_gzip select * from raw_text_data; +Inserted 1000000000 rows in 1418.24s +</codeblock> + + </conbody> + + </concept> + + <concept id="parquet_none"> + + <title>Example of Uncompressed Parquet Table</title> + + <conbody> + + <p> + If your data compresses very poorly, or you want to avoid the CPU overhead of compression and + decompression entirely, set the <codeph>COMPRESSION_CODEC</codeph> query option to <codeph>none</codeph> + before inserting the data: + </p> + +<codeblock>[localhost:21000] > create table parquet_none like raw_text_data; +[localhost:21000] > set COMPRESSION_CODEC=none; +[localhost:21000] > insert into parquet_none select * from raw_text_data; +Inserted 1000000000 rows in 146.90s +</codeblock> + + </conbody> + + </concept> + + <concept id="parquet_compression_examples"> + + <title>Examples of Sizes and Speeds for Compressed Parquet Tables</title> + + <conbody> + + <p> + Here are some examples showing differences in data sizes and query speeds for 1 billion rows of synthetic + data, compressed with each kind of codec. As always, run similar tests with realistic data sets of your + own. The actual compression ratios, and relative insert and query speeds, will vary depending on the + characteristics of the actual data. + </p> + + <p> + In this case, switching from Snappy to GZip compression shrinks the data by an additional 40% or so, + while switching from Snappy compression to no compression expands the data also by about 40%: + </p> + +<codeblock>$ hdfs dfs -du -h /user/hive/warehouse/parquet_compression.db +23.1 G /user/hive/warehouse/parquet_compression.db/parquet_snappy +13.5 G /user/hive/warehouse/parquet_compression.db/parquet_gzip +32.8 G /user/hive/warehouse/parquet_compression.db/parquet_none +</codeblock> + + <p> + Because Parquet data files are typically <ph rev="parquet_block_size">large</ph>, each directory will + have a different number of data files and the row groups will be arranged differently. + </p> + + <p> + At the same time, the less agressive the compression, the faster the data can be decompressed. In this + case using a table with a billion rows, a query that evaluates all the values for a particular column + runs faster with no compression than with Snappy compression, and faster with Snappy compression than + with Gzip compression. Query performance depends on several other factors, so as always, run your own + benchmarks with your own data to determine the ideal tradeoff between data size, CPU efficiency, and + speed of insert and query operations. + </p> + +<codeblock>[localhost:21000] > desc parquet_snappy; +Query finished, fetching results ... ++-----------+---------+---------+ +| name | type | comment | ++-----------+---------+---------+ +| id | int | | +| val | int | | +| zfill | string | | +| name | string | | +| assertion | boolean | | ++-----------+---------+---------+ +Returned 5 row(s) in 0.14s +[localhost:21000] > select avg(val) from parquet_snappy; +Query finished, fetching results ... ++-----------------+ +| _c0 | ++-----------------+ +| 250000.93577915 | ++-----------------+ +Returned 1 row(s) in 4.29s +[localhost:21000] > select avg(val) from parquet_gzip; +Query finished, fetching results ... ++-----------------+ +| _c0 | ++-----------------+ +| 250000.93577915 | ++-----------------+ +Returned 1 row(s) in 6.97s +[localhost:21000] > select avg(val) from parquet_none; +Query finished, fetching results ... ++-----------------+ +| _c0 | ++-----------------+ +| 250000.93577915 | ++-----------------+ +Returned 1 row(s) in 3.67s +</codeblock> + + </conbody> + + </concept> + + <concept id="parquet_compression_multiple"> + + <title>Example of Copying Parquet Data Files</title> + + <conbody> + + <p> + Here is a final example, to illustrate how the data files using the various compression codecs are all + compatible with each other for read operations. The metadata about the compression format is written into + each data file, and can be decoded during queries regardless of the <codeph>COMPRESSION_CODEC</codeph> + setting in effect at the time. In this example, we copy data files from the + <codeph>PARQUET_SNAPPY</codeph>, <codeph>PARQUET_GZIP</codeph>, and <codeph>PARQUET_NONE</codeph> tables + used in the previous examples, each containing 1 billion rows, all to the data directory of a new table + <codeph>PARQUET_EVERYTHING</codeph>. A couple of sample queries demonstrate that the new table now + contains 3 billion rows featuring a variety of compression codecs for the data files. + </p> + + <p> + First, we create the table in Impala so that there is a destination directory in HDFS to put the data + files: + </p> + +<codeblock>[localhost:21000] > create table parquet_everything like parquet_snappy; +Query: create table parquet_everything like parquet_snappy +</codeblock> + + <p> + Then in the shell, we copy the relevant data files into the data directory for this new table. Rather + than using <codeph>hdfs dfs -cp</codeph> as with typical files, we use <codeph>hadoop distcp -pb</codeph> + to ensure that the special <ph rev="parquet_block_size"> block size</ph> of the Parquet data files is + preserved. + </p> + +<codeblock>$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_snappy \ + /user/hive/warehouse/parquet_compression.db/parquet_everything +...<varname>MapReduce output</varname>... +$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_gzip \ + /user/hive/warehouse/parquet_compression.db/parquet_everything +...<varname>MapReduce output</varname>... +$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_none \ + /user/hive/warehouse/parquet_compression.db/parquet_everything +...<varname>MapReduce output</varname>... +</codeblock> + + <p> + Back in the <cmdname>impala-shell</cmdname> interpreter, we use the <codeph>REFRESH</codeph> statement to + alert the Impala server to the new data files for this table, then we can run queries demonstrating that + the data files represent 3 billion rows, and the values for one of the numeric columns match what was in + the original smaller tables: + </p> + +<codeblock>[localhost:21000] > refresh parquet_everything; +Query finished, fetching results ... + +Returned 0 row(s) in 0.32s +[localhost:21000] > select count(*) from parquet_everything; +Query finished, fetching results ... ++------------+ +| _c0 | ++------------+ +| 3000000000 | ++------------+ +Returned 1 row(s) in 8.18s +[localhost:21000] > select avg(val) from parquet_everything; +Query finished, fetching results ... ++-----------------+ +| _c0 | ++-----------------+ +| 250000.93577915 | ++-----------------+ +Returned 1 row(s) in 13.35s +</codeblock> + + </conbody> + + </concept> + + </concept> + + <concept rev="2.3.0" id="parquet_complex_types"> + + <title>Parquet Tables for Impala Complex Types</title> + + <conbody> + + <p> + In CDH 5.5 / Impala 2.3 and higher, Impala supports the complex types + <codeph>ARRAY</codeph>, <codeph>STRUCT</codeph>, and <codeph>MAP</codeph> + See <xref href="impala_complex_types.xml#complex_types"/> for details. + Because these data types are currently supported only for the Parquet file format, + if you plan to use them, become familiar with the performance and storage aspects + of Parquet first. + </p> + + </conbody> + + </concept> + + <concept id="parquet_interop"> + + <title>Exchanging Parquet Data Files with Other Hadoop Components</title> + <prolog> + <metadata> + <data name="Category" value="Hadoop"/> + </metadata> + </prolog> + + <conbody> + + <p> Starting in CDH 4.5, you can read and write Parquet data files from + other CDH components. See <xref href="cdh_ig_parquet.xml#parquet_format" + /> for details. </p> + +<!-- These couple of paragraphs reused in the release notes 'incompatible changes' section. --> + +<!-- But conbodydiv tag too restrictive, can't have just paragraphs and codeblocks inside. --> + +<!-- So I will physically copy the info for the time being. --> + +<!-- <conbodydiv id="upgrade_parquet_metadata"> --> + + <p> + Previously, it was not possible to create Parquet data through Impala and reuse that table within Hive. Now + that Parquet support is available for Hive in CDH 4.5, reusing existing Impala Parquet data files in Hive + requires updating the table metadata. Use the following command if you are already running Impala 1.1.1 or + higher: + </p> + +<codeblock>ALTER TABLE <varname>table_name</varname> SET FILEFORMAT PARQUET; +</codeblock> + + <p> + If you are running a level of Impala that is older than 1.1.1, do the metadata update through Hive: + </p> + +<codeblock>ALTER TABLE <varname>table_name</varname> SET SERDE 'parquet.hive.serde.ParquetHiveSerDe'; +ALTER TABLE <varname>table_name</varname> SET FILEFORMAT + INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat" + OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"; +</codeblock> + + <p> + Impala 1.1.1 and higher can reuse Parquet data files created by Hive, without any action required. + </p> + +<!-- </conbodydiv> --> + + <p rev="2.2.0"> + Impala supports the scalar data types that you can encode in a Parquet data file, but not composite or + nested types such as maps or arrays. In Impala 2.2.0 / CDH 5.4.0 and higher, Impala can query Parquet data + files that include composite or nested types, as long as the query only refers to columns with scalar + types. +<!-- TK: could include an example here, but would require setup in Hive or Pig or something. --> + </p> + + <p> + If you copy Parquet data files between nodes, or even between different directories on the same node, make + sure to preserve the block size by using the command <codeph>hadoop distcp -pb</codeph>. To verify that the + block size was preserved, issue the command <codeph>hdfs fsck -blocks + <varname>HDFS_path_of_impala_table_dir</varname></codeph> and check that the average block size is at or + near <ph rev="parquet_block_size">256 MB (or whatever other size is defined by the + <codeph>PARQUET_FILE_SIZE</codeph> query option).</ph>. (The <codeph>hadoop distcp</codeph> operation + typically leaves some directories behind, with names matching <filepath>_distcp_logs_*</filepath>, that you + can delete from the destination directory afterward.) +<!-- The Apache wiki page keeps disappearing, even though Google still points to it as of Nov. 11/2014. --> +<!-- Now there is a 'distcp2' guide: http://hadoop.apache.org/docs/r1.2.1/distcp2.html but I haven't tried that so let's play it safe for now and hide the link. --> +<!-- See the <xref href="http://hadoop.apache.org/docs/r0.19.0/distcp.html" scope="external" format="html">Hadoop DistCP Guide</xref> for details. --> + Issue the command <cmdname>hadoop distcp</cmdname> for details about <cmdname>distcp</cmdname> command + syntax. + </p> + +<!-- Sample commands/output for when the 'distcp' business is expanded into a tutorial later. Part of + a1730.halxg.cloudera.com:/home/jrussell/jdr/mixed_format_partitions.log. +<codeblock>$ hdfs fsck -blocks /user/impala/warehouse/parquet_compression.db/parquet_everything +Connecting to namenode via http://a1730.halxg.cloudera.com:50070 +FSCK started by jrussell (auth:SIMPLE) from /10.20.198.130 for path /user/impala/warehouse/parquet_compression.db/parquet_everything at Fri Aug 23 11:35:37 PDT 2013 +............................................................................Status: HEALTHY + Total size: 74504481213 B + Total dirs: 1 + Total files: 76 + Total blocks (validated): 76 (avg. block size 980322121 B) + Minimally replicated blocks: 76 (100.0 %) + Over-replicated blocks: 0 (0.0 %) + Under-replicated blocks: 0 (0.0 %) + Mis-replicated blocks: 0 (0.0 %) + Default replication factor: 3 + Average block replication: 3.0 + Corrupt blocks: 0 + Missing replicas: 0 (0.0 %) + Number of data-nodes: 4 + Number of racks: 1 +FSCK ended at Fri Aug 23 11:35:37 PDT 2013 in 8 milliseconds + + +The filesystem under path '/user/impala/warehouse/parquet_compression.db/parquet_everything' is HEALTHY +</codeblock> +--> + + <p conref="../shared/impala_common.xml#common/impala_parquet_encodings_caveat"/> + <p conref="../shared/impala_common.xml#common/parquet_tools_blurb"/> + + </conbody> + + </concept> + + <concept id="parquet_data_files"> + + <title>How Parquet Data Files Are Organized</title> + <prolog> + <metadata> + <data name="Category" value="Concepts"/> + </metadata> + </prolog> + + <conbody> + + <p> + Although Parquet is a column-oriented file format, do not expect to find one data file for each column. + Parquet keeps all the data for a row within the same data file, to ensure that the columns for a row are + always available on the same node for processing. What Parquet does is to set a large HDFS block size and a + matching maximum data file size, to ensure that I/O and network transfer requests apply to large batches of + data. + </p> + + <p> + Within that data file, the data for a set of rows is rearranged so that all the values from the first + column are organized in one contiguous block, then all the values from the second column, and so on. + Putting the values from the same column next to each other lets Impala use effective compression techniques + on the values in that column. + </p> + + <note> + <p> + Impala <codeph>INSERT</codeph> statements write Parquet data files using an HDFS block size + <ph rev="parquet_block_size">that matches the data file size</ph>, to ensure that each data file is + represented by a single HDFS block, and the entire file can be processed on a single node without + requiring any remote reads. + </p> + + <p> + If you create Parquet data files outside of Impala, such as through a MapReduce or Pig job, ensure that + the HDFS block size is greater than or equal to the file size, so that the <q>one file per block</q> + relationship is maintained. Set the <codeph>dfs.block.size</codeph> or the <codeph>dfs.blocksize</codeph> + property large enough that each file fits within a single HDFS block, even if that size is larger than + the normal HDFS block size. + </p> + + <p> + If the block size is reset to a lower value during a file copy, you will see lower performance for + queries involving those files, and the <codeph>PROFILE</codeph> statement will reveal that some I/O is + being done suboptimally, through remote reads. See + <xref href="impala_parquet.xml#parquet_compression_multiple"/> for an example showing how to preserve the + block size when copying Parquet data files. + </p> + </note> + + <p> + When Impala retrieves or tests the data for a particular column, it opens all the data files, but only + reads the portion of each file containing the values for that column. The column values are stored + consecutively, minimizing the I/O required to process the values within a single column. If other columns + are named in the <codeph>SELECT</codeph> list or <codeph>WHERE</codeph> clauses, the data for all columns + in the same row is available within that same data file. + </p> + + <p> + If an <codeph>INSERT</codeph> statement brings in less than <ph rev="parquet_block_size">one Parquet + block's worth</ph> of data, the resulting data file is smaller than ideal. Thus, if you do split up an ETL + job to use multiple <codeph>INSERT</codeph> statements, try to keep the volume of data for each + <codeph>INSERT</codeph> statement to approximately <ph rev="parquet_block_size">256 MB, or a multiple of + 256 MB</ph>. + </p> + + </conbody> + + <concept id="parquet_encoding"> + + <title>RLE and Dictionary Encoding for Parquet Data Files</title> + + <conbody> + + <p> + Parquet uses some automatic compression techniques, such as run-length encoding (RLE) and dictionary + encoding, based on analysis of the actual data values. Once the data values are encoded in a compact + form, the encoded data can optionally be further compressed using a compression algorithm. Parquet data + files created by Impala can use Snappy, GZip, or no compression; the Parquet spec also allows LZO + compression, but currently Impala does not support LZO-compressed Parquet files. + </p> + + <p> + RLE and dictionary encoding are compression techniques that Impala applies automatically to groups of + Parquet data values, in addition to any Snappy or GZip compression applied to the entire data files. + These automatic optimizations can save you time and planning that are normally needed for a traditional + data warehouse. For example, dictionary encoding reduces the need to create numeric IDs as abbreviations + for longer string values. + </p> + + <p> + Run-length encoding condenses sequences of repeated data values. For example, if many consecutive rows + all contain the same value for a country code, those repeating values can be represented by the value + followed by a count of how many times it appears consecutively. + </p> + + <p> + Dictionary encoding takes the different values present in a column, and represents each one in compact + 2-byte form rather than the original value, which could be several bytes. (Additional compression is + applied to the compacted values, for extra space savings.) This type of encoding applies when the number + of different values for a column is less than 2**16 (16,384). It does not apply to columns of data type + <codeph>BOOLEAN</codeph>, which are already very short. <codeph>TIMESTAMP</codeph> columns sometimes have + a unique value for each row, in which case they can quickly exceed the 2**16 limit on distinct values. + The 2**16 limit on different values within a column is reset for each data file, so if several different + data files each contained 10,000 different city names, the city name column in each data file could still + be condensed using dictionary encoding. + </p> + + </conbody> + + </concept> + + </concept> + + <concept rev="1.4.0" id="parquet_compacting"> + + <title>Compacting Data Files for Parquet Tables</title> + + <conbody> + + <p> + If you reuse existing table structures or ETL processes for Parquet tables, you might encounter a <q>many + small files</q> situation, which is suboptimal for query efficiency. For example, statements like these + might produce inefficiently organized data files: + </p> + +<codeblock>-- In an N-node cluster, each node produces a data file +-- for the INSERT operation. If you have less than +-- N GB of data to copy, some files are likely to be +-- much smaller than the <ph rev="parquet_block_size">default Parquet</ph> block size. +insert into parquet_table select * from text_table; + +-- Even if this operation involves an overall large amount of data, +-- when split up by year/month/day, each partition might only +-- receive a small amount of data. Then the data files for +-- the partition might be divided between the N nodes in the cluster. +-- A multi-gigabyte copy operation might produce files of only +-- a few MB each. +insert into partitioned_parquet_table partition (year, month, day) + select year, month, day, url, referer, user_agent, http_code, response_time + from web_stats; +</codeblock> + + <p> + Here are techniques to help you produce large data files in Parquet <codeph>INSERT</codeph> operations, and + to compact existing too-small data files: + </p> + + <ul> + <li> + <p> + When inserting into a partitioned Parquet table, use statically partitioned <codeph>INSERT</codeph> + statements where the partition key values are specified as constant values. Ideally, use a separate + <codeph>INSERT</codeph> statement for each partition. + </p> + </li> + + <li> + <p conref="../shared/impala_common.xml#common/num_nodes_tip"/> + </li> + + <li> + <p> + Be prepared to reduce the number of partition key columns from what you are used to with traditional + analytic database systems. + </p> + </li> + + <li> + <p> + Do not expect Impala-written Parquet files to fill up the entire Parquet block size. Impala estimates + on the conservative side when figuring out how much data to write to each Parquet file. Typically, the + of uncompressed data in memory is substantially reduced on disk by the compression and encoding + techniques in the Parquet file format. +<!-- + Impala reserves <ph rev="parquet_block_size">1 GB</ph> of memory to buffer the data before writing, + but the actual data file might be smaller, in the hundreds of megabytes. + --> + The final data file size varies depending on the compressibility of the data. Therefore, it is not an + indication of a problem if <ph rev="parquet_block_size">256 MB</ph> of text data is turned into 2 + Parquet data files, each less than <ph rev="parquet_block_size">256 MB</ph>. + </p> + </li> + + <li> + <p> + If you accidentally end up with a table with many small data files, consider using one or more of the + preceding techniques and copying all the data into a new Parquet table, either through <codeph>CREATE + TABLE AS SELECT</codeph> or <codeph>INSERT ... SELECT</codeph> statements. + </p> + + <p> + To avoid rewriting queries to change table names, you can adopt a convention of always running + important queries against a view. Changing the view definition immediately switches any subsequent + queries to use the new underlying tables: + </p> +<codeblock>create view production_table as select * from table_with_many_small_files; +-- CTAS or INSERT...SELECT all the data into a more efficient layout... +alter view production_table as select * from table_with_few_big_files; +select * from production_table where c1 = 100 and c2 < 50 and ...; +</codeblock> + </li> + </ul> + + </conbody> + + </concept> + + <concept rev="1.4.0" id="parquet_schema_evolution"> + + <title>Schema Evolution for Parquet Tables</title> + + <conbody> + + <p> + Schema evolution refers to using the statement <codeph>ALTER TABLE ... REPLACE COLUMNS</codeph> to change + the names, data type, or number of columns in a table. You can perform schema evolution for Parquet tables + as follows: + </p> + + <ul> + <li> + <p> + The Impala <codeph>ALTER TABLE</codeph> statement never changes any data files in the tables. From the + Impala side, schema evolution involves interpreting the same data files in terms of a new table + definition. Some types of schema changes make sense and are represented correctly. Other types of + changes cannot be represented in a sensible way, and produce special result values or conversion errors + during queries. + </p> + </li> + + <li> + <p> + The <codeph>INSERT</codeph> statement always creates data using the latest table definition. You might + end up with data files with different numbers of columns or internal data representations if you do a + sequence of <codeph>INSERT</codeph> and <codeph>ALTER TABLE ... REPLACE COLUMNS</codeph> statements. + </p> + </li> + + <li> + <p> + If you use <codeph>ALTER TABLE ... REPLACE COLUMNS</codeph> to define additional columns at the end, + when the original data files are used in a query, these final columns are considered to be all + <codeph>NULL</codeph> values. + </p> + </li> + + <li> + <p> + If you use <codeph>ALTER TABLE ... REPLACE COLUMNS</codeph> to define fewer columns than before, when + the original data files are used in a query, the unused columns still present in the data file are + ignored. + </p> + </li> + + <li> + <p> + Parquet represents the <codeph>TINYINT</codeph>, <codeph>SMALLINT</codeph>, and <codeph>INT</codeph> + types the same internally, all stored in 32-bit integers. + </p> + <ul> + <li> + That means it is easy to promote a <codeph>TINYINT</codeph> column to <codeph>SMALLINT</codeph> or + <codeph>INT</codeph>, or a <codeph>SMALLINT</codeph> column to <codeph>INT</codeph>. The numbers are + represented exactly the same in the data file, and the columns being promoted would not contain any + out-of-range values. + </li> + + <li> + <p> + If you change any of these column types to a smaller type, any values that are out-of-range for the + new type are returned incorrectly, typically as negative numbers. + </p> + </li> + + <li> + <p> + You cannot change a <codeph>TINYINT</codeph>, <codeph>SMALLINT</codeph>, or <codeph>INT</codeph> + column to <codeph>BIGINT</codeph>, or the other way around. Although the <codeph>ALTER + TABLE</codeph> succeeds, any attempt to query those columns results in conversion errors. + </p> + </li> + + <li> + <p> + Any other type conversion for columns produces a conversion error during queries. For example, + <codeph>INT</codeph> to <codeph>STRING</codeph>, <codeph>FLOAT</codeph> to <codeph>DOUBLE</codeph>, + <codeph>TIMESTAMP</codeph> to <codeph>STRING</codeph>, <codeph>DECIMAL(9,0)</codeph> to + <codeph>DECIMAL(5,2)</codeph>, and so on. + </p> + </li> + </ul> + </li> + </ul> + + <p rev="2.6.0 IMPALA-2835 CDH-33330"> + You might find that you have Parquet files where the columns do not line up in the same + order as in your Impala table. For example, you might have a Parquet file that was part of + a table with columns <codeph>C1,C2,C3,C4</codeph>, and now you want to reuse the same + Parquet file in a table with columns <codeph>C4,C2</codeph>. By default, Impala expects the + columns in the data file to appear in the same order as the columns defined for the table, + making it impractical to do some kinds of file reuse or schema evolution. In CDH 5.8 / Impala 2.6 + and higher, the query option <codeph>PARQUET_FALLBACK_SCHEMA_RESOLUTION=name</codeph> lets Impala + resolve columns by name, and therefore handle out-of-order or extra columns in the data file. + For example: + +<codeblock conref="../shared/impala_common.xml#common/parquet_fallback_schema_resolution_example"/> + + See <xref href="impala_parquet_fallback_schema_resolution.xml#parquet_fallback_schema_resolution"/> + for more details. + </p> + + </conbody> + + </concept> + + <concept id="parquet_data_types"> + + <title>Data Type Considerations for Parquet Tables</title> + + <conbody> + + <p> + The Parquet format defines a set of data types whose names differ from the names of the corresponding + Impala data types. If you are preparing Parquet files using other Hadoop components such as Pig or + MapReduce, you might need to work with the type names defined by Parquet. The following figure lists the + Parquet-defined types and the equivalent types in Impala. + </p> + + <p> + <b>Primitive types:</b> + </p> + +<codeblock>BINARY -> STRING +BOOLEAN -> BOOLEAN +DOUBLE -> DOUBLE +FLOAT -> FLOAT +INT32 -> INT +INT64 -> BIGINT +INT96 -> TIMESTAMP +</codeblock> + + <p> + <b>Logical types:</b> + </p> + +<codeblock>BINARY + OriginalType UTF8 -> STRING +BINARY + OriginalType DECIMAL -> DECIMAL +</codeblock> + + <p rev="2.3.0"> + <b>Complex types:</b> + </p> + + <p rev="2.3.0"> + For the complex types (<codeph>ARRAY</codeph>, <codeph>MAP</codeph>, and <codeph>STRUCT</codeph>) + available in CDH 5.5 / Impala 2.3 and higher, Impala only supports queries + against those types in Parquet tables. + </p> </conbody> - + </concept> </concept>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3c2c8f12/docs/topics/impala_parquet_compression_codec.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_parquet_compression_codec.xml b/docs/topics/impala_parquet_compression_codec.xml index d178a0d..7132727 100644 --- a/docs/topics/impala_parquet_compression_codec.xml +++ b/docs/topics/impala_parquet_compression_codec.xml @@ -3,6 +3,7 @@ <concept id="parquet_compression_codec"> <title>PARQUET_COMPRESSION_CODEC Query Option</title> + <titlealts audience="PDF"><navtitle>PARQUET_COMPRESSION_CODEC</navtitle></titlealts> <prolog> <metadata> <data name="Category" value="Impala"/> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3c2c8f12/docs/topics/impala_parquet_file_size.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_parquet_file_size.xml b/docs/topics/impala_parquet_file_size.xml index 396fa92..7019e93 100644 --- a/docs/topics/impala_parquet_file_size.xml +++ b/docs/topics/impala_parquet_file_size.xml @@ -3,12 +3,16 @@ <concept rev="parquet_block_size" id="parquet_file_size"> <title>PARQUET_FILE_SIZE Query Option</title> + <titlealts audience="PDF"><navtitle>PARQUET_FILE_SIZE</navtitle></titlealts> <prolog> <metadata> <data name="Category" value="Impala"/> <data name="Category" value="Parquet"/> + <data name="Category" value="ETL"/> <data name="Category" value="File Formats"/> <data name="Category" value="Impala Query Options"/> + <data name="Category" value="Developers"/> + <data name="Category" value="Data Analysts"/> </metadata> </prolog> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3c2c8f12/docs/topics/impala_partitioning.xml ---------------------------------------------------------------------- diff --git a/docs/topics/impala_partitioning.xml b/docs/topics/impala_partitioning.xml index 46b1568..d9cb31a 100644 --- a/docs/topics/impala_partitioning.xml +++ b/docs/topics/impala_partitioning.xml @@ -3,24 +3,580 @@ <concept id="partitioning"> <title>Partitioning for Impala Tables</title> - <titlealts audience="PDF"><navtitle>Partitioning</navtitle></titlealts> - + + <titlealts audience="PDF"> + + <navtitle>Partitioning</navtitle> + + </titlealts> + + <prolog> + <metadata> + <data name="Category" value="Impala"/> + <data name="Category" value="SQL"/> + <data name="Category" value="Performance"/> + <data name="Category" value="Developers"/> + <data name="Category" value="Data Analysts"/> + </metadata> + </prolog> + <conbody> <p> <indexterm audience="Cloudera">partitioning</indexterm> - By default, all the data files for a table are located in a single directory. Partitioning is a technique for - physically dividing the data during loading, based on values from one or more columns, to speed up queries - that test those columns. For example, with a <codeph>school_records</codeph> table partitioned on a - <codeph>year</codeph> column, there is a separate data directory for each different year value, and all the - data for that year is stored in a data file in that directory. A query that includes a <codeph>WHERE</codeph> - condition such as <codeph>YEAR=1966</codeph>, <codeph>YEAR IN (1989,1999)</codeph>, or <codeph>YEAR BETWEEN - 1984 AND 1989</codeph> can examine only the data files from the appropriate directory or directories, greatly - reducing the amount of data to read and test. + By default, all the data files for a table are located in a single directory. Partitioning is a technique for physically dividing the + data during loading, based on values from one or more columns, to speed up queries that test those columns. For example, with a + <codeph>school_records</codeph> table partitioned on a <codeph>year</codeph> column, there is a separate data directory for each + different year value, and all the data for that year is stored in a data file in that directory. A query that includes a + <codeph>WHERE</codeph> condition such as <codeph>YEAR=1966</codeph>, <codeph>YEAR IN (1989,1999)</codeph>, or <codeph>YEAR BETWEEN + 1984 AND 1989</codeph> can examine only the data files from the appropriate directory or directories, greatly reducing the amount of + data to read and test. </p> - + <p outputclass="toc inpage"/> + + <p> + See <xref href="impala_tutorial.xml#tut_external_partition_data"/> for an example that illustrates the syntax for creating partitioned + tables, the underlying directory structure in HDFS, and how to attach a partitioned Impala external table to data files stored + elsewhere in HDFS. + </p> + + <p> + Parquet is a popular format for partitioned Impala tables because it is well suited to handle huge data volumes. See + <xref href="impala_parquet.xml#parquet_performance"/> for performance considerations for partitioned Parquet tables. + </p> + + <p> + See <xref href="impala_literals.xml#null"/> for details about how <codeph>NULL</codeph> values are represented in partitioned tables. + </p> + + <p rev="2.2.0"> + See <xref href="impala_s3.xml#s3"/> for details about setting up tables where some or all partitions reside on the Amazon Simple + Storage Service (S3). + </p> + + </conbody> + + <concept id="partitioning_choosing"> + + <title>When to Use Partitioned Tables</title> + + <conbody> + + <p> + Partitioning is typically appropriate for: + </p> + + <ul> + <li> + Tables that are very large, where reading the entire data set takes an impractical amount of time. + </li> + + <li> + Tables that are always or almost always queried with conditions on the partitioning columns. In our example of a table partitioned + by year, <codeph>SELECT COUNT(*) FROM school_records WHERE year = 1985</codeph> is efficient, only examining a small fraction of + the data; but <codeph>SELECT COUNT(*) FROM school_records</codeph> has to process a separate data file for each year, resulting in + more overall work than in an unpartitioned table. You would probably not partition this way if you frequently queried the table + based on last name, student ID, and so on without testing the year. + </li> + + <li> + Columns that have reasonable cardinality (number of different values). If a column only has a small number of values, for example + <codeph>Male</codeph> or <codeph>Female</codeph>, you do not gain much efficiency by eliminating only about 50% of the data to + read for each query. If a column has only a few rows matching each value, the number of directories to process can become a + limiting factor, and the data file in each directory could be too small to take advantage of the Hadoop mechanism for transmitting + data in multi-megabyte blocks. For example, you might partition census data by year, store sales data by year and month, and web + traffic data by year, month, and day. (Some users with high volumes of incoming data might even partition down to the individual + hour and minute.) + </li> + + <li> + Data that already passes through an extract, transform, and load (ETL) pipeline. The values of the partitioning columns are + stripped from the original data files and represented by directory names, so loading data into a partitioned table involves some + sort of transformation or preprocessing. + </li> + </ul> + + </conbody> + + </concept> + + <concept id="partition_sql"> + + <title>SQL Statements for Partitioned Tables</title> + + <conbody> + + <p> + In terms of Impala SQL syntax, partitioning affects these statements: + </p> + + <ul> + <li> + <codeph><xref href="impala_create_table.xml#create_table">CREATE TABLE</xref></codeph>: you specify a <codeph>PARTITIONED + BY</codeph> clause when creating the table to identify names and data types of the partitioning columns. These columns are not + included in the main list of columns for the table. + </li> + + <li rev="2.5.0"> + In CDH 5.7 / Impala 2.5 and higher, you can also use the <codeph>PARTITIONED BY</codeph> clause in a <codeph>CREATE TABLE AS + SELECT</codeph> statement. This syntax lets you use a single statement to create a partitioned table, copy data into it, and + create new partitions based on the values in the inserted data. + </li> + + <li> + <codeph><xref href="impala_alter_table.xml#alter_table">ALTER TABLE</xref></codeph>: you can add or drop partitions, to work with + different portions of a huge data set. You can designate the HDFS directory that holds the data files for a specific partition. + With data partitioned by date values, you might <q>age out</q> data that is no longer relevant. + <note conref="../shared/impala_common.xml#common/add_partition_set_location"/> + </li> + + <li> + <codeph><xref href="impala_insert.xml#insert">INSERT</xref></codeph>: When you insert data into a partitioned table, you identify + the partitioning columns. One or more values from each inserted row are not stored in data files, but instead determine the + directory where that row value is stored. You can also specify which partition to load a set of data into, with <codeph>INSERT + OVERWRITE</codeph> statements; you can replace the contents of a specific partition but you cannot append data to a specific + partition. + <p rev="1.3.1" conref="../shared/impala_common.xml#common/insert_inherit_permissions"/> + </li> + + <li> + Although the syntax of the <codeph><xref href="impala_select.xml#select">SELECT</xref></codeph> statement is the same whether or + not the table is partitioned, the way queries interact with partitioned tables can have a dramatic impact on performance and + scalability. The mechanism that lets queries skip certain partitions during a query is known as partition pruning; see + <xref href="impala_partitioning.xml#partition_pruning"/> for details. + </li> + + <li rev="1.4.0"> + In Impala 1.4 and later, there is a <codeph>SHOW PARTITIONS</codeph> statement that displays information about each partition in a + table. See <xref href="impala_show.xml#show"/> for details. + </li> + </ul> + + </conbody> + + </concept> + + <concept id="partition_static_dynamic"> + + <title>Static and Dynamic Partitioning Clauses</title> + + <conbody> + + <p> + Specifying all the partition columns in a SQL statement is called <term>static partitioning</term>, because the statement affects a + single predictable partition. For example, you use static partitioning with an <codeph>ALTER TABLE</codeph> statement that affects + only one partition, or with an <codeph>INSERT</codeph> statement that inserts all values into the same partition: + </p> + +<codeblock>insert into t1 <b>partition(x=10, y='a')</b> select c1 from some_other_table; +</codeblock> + + <p> + When you specify some partition key columns in an <codeph>INSERT</codeph> statement, but leave out the values, Impala determines + which partition to insert. This technique is called <term>dynamic partitioning</term>: + </p> + +<codeblock>insert into t1 <b>partition(x, y='b')</b> select c1, c2 from some_other_table; +-- Create new partition if necessary based on variable year, month, and day; insert a single value. +insert into weather <b>partition (year, month, day)</b> select 'cloudy',2014,4,21; +-- Create new partition if necessary for specified year and month but variable day; insert a single value. +insert into weather <b>partition (year=2014, month=04, day)</b> select 'sunny',22; +</codeblock> + + <p> + The more key columns you specify in the <codeph>PARTITION</codeph> clause, the fewer columns you need in the <codeph>SELECT</codeph> + list. The trailing columns in the <codeph>SELECT</codeph> list are substituted in order for the partition key columns with no + specified value. + </p> + + </conbody> + + </concept> + + <concept id="partition_refresh" rev="2.7.0 IMPALA-1683 CDH-43732"> + + <title>Refreshing a Single Partition</title> + + <conbody> + + <p> + The <codeph>REFRESH</codeph> statement is typically used with partitioned tables when new data files are loaded into a partition by + some non-Impala mechanism, such as a Hive or Spark job. The <codeph>REFRESH</codeph> statement makes Impala aware of the new data + files so that they can be used in Impala queries. Because partitioned tables typically contain a high volume of data, the + <codeph>REFRESH</codeph> operation for a full partitioned table can take significant time. + </p> + + <p> + In CDH 5.9 / Impala 2.7 and higher, you can include a <codeph>PARTITION (<varname>partition_spec</varname>)</codeph> clause in the + <codeph>REFRESH</codeph> statement so that only a single partition is refreshed. For example, <codeph>REFRESH big_table PARTITION + (year=2017, month=9, day=30)</codeph>. The partition spec must include all the partition key columns. See + <xref href="impala_refresh.xml#refresh"/> for more details and examples of <codeph>REFRESH</codeph> syntax and usage. + </p> + + </conbody> + + </concept> + + <concept id="partition_permissions"> + + <title>Permissions for Partition Subdirectories</title> + + <conbody> + + <p rev="1.3.1" conref="../shared/impala_common.xml#common/insert_inherit_permissions"/> + + </conbody> + + </concept> + + <concept id="partition_pruning"> + + <title>Partition Pruning for Queries</title> + + <conbody> + + <p> + Partition pruning refers to the mechanism where a query can skip reading the data files corresponding to one or more partitions. If + you can arrange for queries to prune large numbers of unnecessary partitions from the query execution plan, the queries use fewer + resources and are thus proportionally faster and more scalable. + </p> + + <p> + For example, if a table is partitioned by columns <codeph>YEAR</codeph>, <codeph>MONTH</codeph>, and <codeph>DAY</codeph>, then + <codeph>WHERE</codeph> clauses such as <codeph>WHERE year = 2013</codeph>, <codeph>WHERE year < 2010</codeph>, or <codeph>WHERE + year BETWEEN 1995 AND 1998</codeph> allow Impala to skip the data files in all partitions outside the specified range. Likewise, + <codeph>WHERE year = 2013 AND month BETWEEN 1 AND 3</codeph> could prune even more partitions, reading the data files for only a + portion of one year. + </p> + + <p outputclass="toc inpage"/> + + </conbody> + + <concept id="partition_pruning_checking"> + + <title>Checking if Partition Pruning Happens for a Query</title> + + <conbody> + + <p> + To check the effectiveness of partition pruning for a query, check the <codeph>EXPLAIN</codeph> output for the query before + running it. For example, this example shows a table with 3 partitions, where the query only reads 1 of them. The notation + <codeph>#partitions=1/3</codeph> in the <codeph>EXPLAIN</codeph> plan confirms that Impala can do the appropriate partition + pruning. + </p> + +<codeblock>[localhost:21000] > insert into census partition (year=2010) values ('Smith'),('Jones'); +[localhost:21000] > insert into census partition (year=2011) values ('Smith'),('Jones'),('Doe'); +[localhost:21000] > insert into census partition (year=2012) values ('Smith'),('Doe'); +[localhost:21000] > select name from census where year=2010; ++-------+ +| name | ++-------+ +| Smith | +| Jones | ++-------+ +[localhost:21000] > explain select name from census <b>where year=2010</b>; ++------------------------------------------------------------------+ +| Explain String | ++------------------------------------------------------------------+ +| PLAN FRAGMENT 0 | +| PARTITION: UNPARTITIONED | +| | +| 1:EXCHANGE | +| | +| PLAN FRAGMENT 1 | +| PARTITION: RANDOM | +| | +| STREAM DATA SINK | +| EXCHANGE ID: 1 | +| UNPARTITIONED | +| | +| 0:SCAN HDFS | +| table=predicate_propagation.census <b>#partitions=1/3</b> size=12B | ++------------------------------------------------------------------+</codeblock> + + <p rev="1.4.0"> + For a report of the volume of data that was actually read and processed at each stage of the query, check the output of the + <codeph>SUMMARY</codeph> command immediately after running the query. For a more detailed analysis, look at the output of the + <codeph>PROFILE</codeph> command; it includes this same summary report near the start of the profile output. + </p> + + </conbody> + + </concept> + + <concept id="partition_pruning_sql"> + + <title>What SQL Constructs Work with Partition Pruning</title> + + <conbody> + + <p rev="1.2.2"> + <indexterm audience="Cloudera">predicate propagation</indexterm> + Impala can even do partition pruning in cases where the partition key column is not directly compared to a constant, by applying + the transitive property to other parts of the <codeph>WHERE</codeph> clause. This technique is known as predicate propagation, and + is available in Impala 1.2.2 and later. In this example, the census table includes another column indicating when the data was + collected, which happens in 10-year intervals. Even though the query does not compare the partition key column + (<codeph>YEAR</codeph>) to a constant value, Impala can deduce that only the partition <codeph>YEAR=2010</codeph> is required, and + again only reads 1 out of 3 partitions. + </p> + +<codeblock rev="1.2.2">[localhost:21000] > drop table census; +[localhost:21000] > create table census (name string, census_year int) partitioned by (year int); +[localhost:21000] > insert into census partition (year=2010) values ('Smith',2010),('Jones',2010); +[localhost:21000] > insert into census partition (year=2011) values ('Smith',2020),('Jones',2020),('Doe',2020); +[localhost:21000] > insert into census partition (year=2012) values ('Smith',2020),('Doe',2020); +[localhost:21000] > select name from census where year = census_year and census_year=2010; ++-------+ +| name | ++-------+ +| Smith | +| Jones | ++-------+ +[localhost:21000] > explain select name from census <b>where year = census_year and census_year=2010</b>; ++------------------------------------------------------------------+ +| Explain String | ++------------------------------------------------------------------+ +| PLAN FRAGMENT 0 | +| PARTITION: UNPARTITIONED | +| | +| 1:EXCHANGE | +| | +| PLAN FRAGMENT 1 | +| PARTITION: RANDOM | +| | +| STREAM DATA SINK | +| EXCHANGE ID: 1 | +| UNPARTITIONED | +| | +| 0:SCAN HDFS | +| table=predicate_propagation.census <b>#partitions=1/3</b> size=22B | +| predicates: census_year = 2010, year = census_year | ++------------------------------------------------------------------+ +</codeblock> + + <p conref="../shared/impala_common.xml#common/partitions_and_views"/> + + <p conref="../shared/impala_common.xml#common/analytic_partition_pruning_caveat"/> + + </conbody> + + </concept> + + <concept id="dynamic_partition_pruning"> + + <title>Dynamic Partition Pruning</title> + + <conbody> + + <p> + The original mechanism uses to prune partitions is <term>static partition pruning</term>, in which the conditions in the + <codeph>WHERE</codeph> clause are analyzed to determine in advance which partitions can be safely skipped. In Impala 2.5 / CDH 5.7 + and higher, Impala can perform <term>dynamic partition pruning</term>, where information about the partitions is collected during + the query, and Impala prunes unnecessary partitions in ways that were impractical to predict in advance. + </p> + + <p> + For example, if partition key columns are compared to literal values in a <codeph>WHERE</codeph> clause, Impala can perform static + partition pruning during the planning phase to only read the relevant partitions: + </p> + +<codeblock> +-- The query only needs to read 3 partitions whose key values are known ahead of time. +-- That's static partition pruning. +SELECT COUNT(*) FROM sales_table WHERE year IN (2005, 2010, 2015); +</codeblock> + + <p> + Dynamic partition pruning involves using information only available at run time, such as the result of a subquery: + </p> + +<codeblock conref="../shared/impala_common.xml#common/simple_dpp_example"/> + +<!-- Former example. Not sure it really would trigger DPP. SELECT COUNT(*) FROM sales_table WHERE year = (SELECT MAX(year) FROM some_other_table); --> + + <p> + In this case, Impala evaluates the subquery, sends the subquery results to all Impala nodes participating in the query, and then + each <cmdname>impalad</cmdname> daemon uses the dynamic partition pruning optimization to read only the partitions with the + relevant key values. + </p> + + <p> + Dynamic partition pruning is especially effective for queries involving joins of several large partitioned tables. Evaluating the + <codeph>ON</codeph> clauses of the join predicates might normally require reading data from all partitions of certain tables. If + the <codeph>WHERE</codeph> clauses of the query refer to the partition key columns, Impala can now often skip reading many of the + partitions while evaluating the <codeph>ON</codeph> clauses. The dynamic partition pruning optimization reduces the amount of I/O + and the amount of intermediate data stored and transmitted across the network during the query. + </p> + + <p conref="../shared/impala_common.xml#common/spill_to_disk_vs_dynamic_partition_pruning"/> + + <p> + Dynamic partition pruning is part of the runtime filtering feature, which applies to other kinds of queries in addition to queries + against partitioned tables. See <xref href="impala_runtime_filtering.xml#runtime_filtering"/> for full details about this feature. + </p> + + </conbody> + + </concept> + + </concept> + + <concept id="partition_key_columns"> + + <title>Partition Key Columns</title> + + <conbody> + + <p> + The columns you choose as the partition keys should be ones that are frequently used to filter query results in important, + large-scale queries. Popular examples are some combination of year, month, and day when the data has associated time values, and + geographic region when the data is associated with some place. + </p> + + <ul> + <li> + <p> + For time-based data, split out the separate parts into their own columns, because Impala cannot partition based on a + <codeph>TIMESTAMP</codeph> column. + </p> + </li> + + <li> + <p> + The data type of the partition columns does not have a significant effect on the storage required, because the values from those + columns are not stored in the data files, rather they are represented as strings inside HDFS directory names. + </p> + </li> + + <li rev="IMPALA-2499"> + <p> + In CDH 5.7 / Impala 2.5 and higher, you can enable the <codeph>OPTIMIZE_PARTITION_KEY_SCANS</codeph> query option to speed up + queries that only refer to partition key columns, such as <codeph>SELECT MAX(year)</codeph>. This setting is not enabled by + default because the query behavior is slightly different if the table contains partition directories without actual data inside. + See <xref href="impala_optimize_partition_key_scans.xml#optimize_partition_key_scans"/> for details. + </p> + </li> + + <li> + <p conref="../shared/impala_common.xml#common/complex_types_partitioning"/> + </li> + + <li> + <p> + Remember that when Impala queries data stored in HDFS, it is most efficient to use multi-megabyte files to take advantage of the + HDFS block size. For Parquet tables, the block size (and ideal size of the data files) is <ph rev="parquet_block_size">256 MB in + Impala 2.0 and later</ph>. Therefore, avoid specifying too many partition key columns, which could result in individual + partitions containing only small amounts of data. For example, if you receive 1 GB of data per day, you might partition by year, + month, and day; while if you receive 5 GB of data per minute, you might partition by year, month, day, hour, and minute. If you + have data with a geographic component, you might partition based on postal code if you have many megabytes of data for each + postal code, but if not, you might partition by some larger region such as city, state, or country. state + </p> + </li> + </ul> + + <p conref="../shared/impala_common.xml#common/partition_key_optimization"/> </conbody> - + + </concept> + + <concept id="mixed_format_partitions"> + + <title>Setting Different File Formats for Partitions</title> + + <conbody> + + <p> + Partitioned tables have the flexibility to use different file formats for different partitions. (For background information about + the different file formats Impala supports, see <xref href="impala_file_formats.xml#file_formats"/>.) For example, if you originally + received data in text format, then received new data in RCFile format, and eventually began receiving data in Parquet format, all + that data could reside in the same table for queries. You just need to ensure that the table is structured so that the data files + that use different file formats reside in separate partitions. + </p> + + <p> + For example, here is how you might switch from text to Parquet data as you receive data for different years: + </p> + +<codeblock>[localhost:21000] > create table census (name string) partitioned by (year smallint); +[localhost:21000] > alter table census add partition (year=2012); -- Text format; + +[localhost:21000] > alter table census add partition (year=2013); -- Text format switches to Parquet before data loaded; +[localhost:21000] > alter table census partition (year=2013) set fileformat parquet; + +[localhost:21000] > insert into census partition (year=2012) values ('Smith'),('Jones'),('Lee'),('Singh'); +[localhost:21000] > insert into census partition (year=2013) values ('Flores'),('Bogomolov'),('Cooper'),('Appiah');</codeblock> + + <p> + At this point, the HDFS directory for <codeph>year=2012</codeph> contains a text-format data file, while the HDFS directory for + <codeph>year=2013</codeph> contains a Parquet data file. As always, when loading non-trivial data, you would use <codeph>INSERT ... + SELECT</codeph> or <codeph>LOAD DATA</codeph> to import data in large batches, rather than <codeph>INSERT ... VALUES</codeph> which + produces small files that are inefficient for real-world queries. + </p> + + <p> + For other file types that Impala cannot create natively, you can switch into Hive and issue the <codeph>ALTER TABLE ... SET + FILEFORMAT</codeph> statements and <codeph>INSERT</codeph> or <codeph>LOAD DATA</codeph> statements there. After switching back to + Impala, issue a <codeph>REFRESH <varname>table_name</varname></codeph> statement so that Impala recognizes any partitions or new + data added through Hive. + </p> + + </conbody> + + </concept> + + <concept id="partition_management"> + + <title>Managing Partitions</title> + + <conbody> + + <p> + You can add, drop, set the expected file format, or set the HDFS location of the data files for individual partitions within an + Impala table. See <xref href="impala_alter_table.xml#alter_table"/> for syntax details, and + <xref href="impala_partitioning.xml#mixed_format_partitions"/> for tips on managing tables containing partitions with different file + formats. + </p> + + <note conref="../shared/impala_common.xml#common/add_partition_set_location"/> + + <p> + What happens to the data files when a partition is dropped depends on whether the partitioned table is designated as internal or + external. For an internal (managed) table, the data files are deleted. For example, if data in the partitioned table is a copy of + raw data files stored elsewhere, you might save disk space by dropping older partitions that are no longer required for reporting, + knowing that the original data is still available if needed later. For an external table, the data files are left alone. For + example, dropping a partition without deleting the associated files lets Impala consider a smaller set of partitions, improving + query efficiency and reducing overhead for DDL operations on the table; if the data is needed again later, you can add the partition + again. See <xref href="impala_tables.xml#tables" /> for details and examples. + </p> + + </conbody> + + </concept> + + <concept rev="kudu" id="partition_kudu" audience="Cloudera"> + + <title>Using Partitioning with Kudu Tables</title> + + <prolog> + <metadata> + <data name="Category" value="Kudu"/> + </metadata> + </prolog> + + <conbody> + + <p> + Kudu tables use a more fine-grained partitioning scheme than tables containing HDFS data files. You specify a <codeph>DISTRIBUTE + BY</codeph> clause with the <codeph>CREATE TABLE</codeph> statement to identify how to divide the values from the partition key + columns. + </p> + + </conbody> + + </concept> + </concept>
