IMPALA-2466: Add more tests for the HDFS parquet scanner. These tests functionally test whether the following type of files are able to be scanned properly:
1) Add a parquet file with multiple blocks such that each node has to scan multiple blocks. 2) Add a parquet file with multiple blocks but only one row group that spans the entire file. Only one scan range should do any work in this case. Change-Id: I4faccd9ce3fad42402652c8f17d4e7aa3d593368 Reviewed-on: http://gerrit.cloudera.org:8080/1500 Reviewed-by: Sailesh Mukil <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/76b67485 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/76b67485 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/76b67485 Branch: refs/heads/master Commit: 76b674850fa1db36ab6e6acda6d3f958661eefbc Parents: 0d1eab7 Author: Sailesh Mukil <[email protected]> Authored: Fri Oct 2 18:42:34 2015 -0700 Committer: Internal Jenkins <[email protected]> Committed: Fri Mar 25 13:10:15 2016 +0000 ---------------------------------------------------------------------- testdata/LineItemMultiBlock/README.dox | 9 ++- .../lineitem_one_row_group.parquet | Bin 0 -> 3144593 bytes .../lineitem_sixblocks.parquet | Bin 0 -> 5539024 bytes testdata/bin/create-load-data.sh | 12 ++++ .../functional/functional_schema_template.sql | 48 ++++++++++++++- .../datasets/functional/schema_constraints.csv | 2 + .../queries/QueryTest/parquet.test | 52 ++++++++++++++-- tests/query_test/test_scanners.py | 59 +++++++++++++++---- 8 files changed, 164 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76b67485/testdata/LineItemMultiBlock/README.dox ---------------------------------------------------------------------- diff --git a/testdata/LineItemMultiBlock/README.dox b/testdata/LineItemMultiBlock/README.dox index bc141c3..7608067 100755 --- a/testdata/LineItemMultiBlock/README.dox +++ b/testdata/LineItemMultiBlock/README.dox @@ -1,5 +1,6 @@ This file was created for: -IMPALA-1881- Maximize data locality when scanning Parquet files with multiple row groups. +IMPALA-1881: Maximize data locality when scanning Parquet files with multiple row groups. +IMPALA-2466: Add more tests to the HDFS parquet scanner. The table lineitem_multiblock is a single parquet file with: - A row group size of approximately 12 KB each. @@ -24,3 +25,9 @@ set parquet.block.size=4086; # This is to set the row group size insert into functional_parquet.lineitem_multiblock select * from tpch.lineitem limit 20000; # We limit to 20000 to keep the size of the table small + +'lineitem_sixblocks' was created the same way but with more rows, so that we got more +blocks. + +'lineitem_multiblock_one_row_group' was created similarly but with a much higher +'parquet.block.size' so that everything fit in one row group. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76b67485/testdata/LineItemMultiBlock/lineitem_one_row_group.parquet ---------------------------------------------------------------------- diff --git a/testdata/LineItemMultiBlock/lineitem_one_row_group.parquet b/testdata/LineItemMultiBlock/lineitem_one_row_group.parquet new file mode 100644 index 0000000..d81e6a5 Binary files /dev/null and b/testdata/LineItemMultiBlock/lineitem_one_row_group.parquet differ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76b67485/testdata/LineItemMultiBlock/lineitem_sixblocks.parquet ---------------------------------------------------------------------- diff --git a/testdata/LineItemMultiBlock/lineitem_sixblocks.parquet b/testdata/LineItemMultiBlock/lineitem_sixblocks.parquet new file mode 100644 index 0000000..34c427b Binary files /dev/null and b/testdata/LineItemMultiBlock/lineitem_sixblocks.parquet differ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76b67485/testdata/bin/create-load-data.sh ---------------------------------------------------------------------- diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh index d139dfc..9b2699b 100755 --- a/testdata/bin/create-load-data.sh +++ b/testdata/bin/create-load-data.sh @@ -304,6 +304,18 @@ function custom-post-load-steps { hadoop fs -Ddfs.block.size=1048576 -put -f \ ${IMPALA_HOME}/testdata/LineItemMultiBlock/000000_0 \ ${FILESYSTEM_PREFIX}/test-warehouse/lineitem_multiblock_parquet + + # IMPALA-2466: Add more tests to the HDFS Parquet scanner (Added after IMPALA-1881) + hadoop fs -mkdir -p /test-warehouse/lineitem_sixblocks_parquet && \ + hadoop fs -Ddfs.block.size=1048576 -put -f \ + ${IMPALA_HOME}/testdata/LineItemMultiBlock/lineitem_sixblocks.parquet \ + /test-warehouse/lineitem_sixblocks_parquet + + # IMPALA-2466: Add more tests to the HDFS Parquet scanner (this has only one row group) + hadoop fs -mkdir -p /test-warehouse/lineitem_multiblock_one_row_group_parquet && \ + hadoop fs -Ddfs.block.size=1048576 -put -f \ + ${IMPALA_HOME}/testdata/LineItemMultiBlock/lineitem_one_row_group.parquet \ + /test-warehouse/lineitem_multiblock_one_row_group_parquet } function copy-and-load-ext-data-source { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76b67485/testdata/datasets/functional/functional_schema_template.sql ---------------------------------------------------------------------- diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index edf8b69..80487f1 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -1774,8 +1774,52 @@ L_RECEIPTDATE STRING L_SHIPINSTRUCT STRING L_SHIPMODE STRING L_COMMENT STRING ----- ROW_FORMAT -DELIMITED FIELDS TERMINATED BY '|' +==== +---- DATASET +-- IMPALA-2466: Add more tests to the HDFS Parquet scanner +functional +---- BASE_TABLE_NAME +lineitem_sixblocks +---- COLUMNS +L_ORDERKEY BIGINT +L_PARTKEY BIGINT +L_SUPPKEY BIGINT +L_LINENUMBER INT +L_QUANTITY DECIMAL(12,2) +L_EXTENDEDPRICE DECIMAL(12,2) +L_DISCOUNT DECIMAL(12,2) +L_TAX DECIMAL(12,2) +L_RETURNFLAG STRING +L_LINESTATUS STRING +L_SHIPDATE STRING +L_COMMITDATE STRING +L_RECEIPTDATE STRING +L_SHIPINSTRUCT STRING +L_SHIPMODE STRING +L_COMMENT STRING +==== +---- DATASET +-- IMPALA-2466: Add more tests to the HDFS Parquet scanner (this has only one row group) +functional +---- BASE_TABLE_NAME +lineitem_multiblock_one_row_group +---- COLUMNS +L_ORDERKEY BIGINT +L_PARTKEY BIGINT +L_SUPPKEY BIGINT +L_LINENUMBER INT +L_QUANTITY DECIMAL(12,2) +L_EXTENDEDPRICE DECIMAL(12,2) +L_DISCOUNT DECIMAL(12,2) +L_TAX DECIMAL(12,2) +L_RETURNFLAG STRING +L_LINESTATUS STRING +L_SHIPDATE STRING +L_COMMITDATE STRING +L_RECEIPTDATE STRING +L_SHIPINSTRUCT STRING +L_SHIPMODE STRING +L_COMMENT STRING ==== ---- DATASET functional http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76b67485/testdata/datasets/functional/schema_constraints.csv ---------------------------------------------------------------------- diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv index f1f4024..2974942 100644 --- a/testdata/datasets/functional/schema_constraints.csv +++ b/testdata/datasets/functional/schema_constraints.csv @@ -44,6 +44,8 @@ table_name:alltypesagg_hive_13_1, constraint:restrict_to, table_format:parquet/n table_name:kite_required_fields, constraint:restrict_to, table_format:parquet/none/none table_name:bad_column_metadata, constraint:restrict_to, table_format:parquet/none/none table_name:lineitem_multiblock, constraint:restrict_to, table_format:parquet/none/none +table_name:lineitem_sixblocks, constraint:restrict_to, table_format:parquet/none/none +table_name:lineitem_multiblock_one_row_group, constraint:restrict_to, table_format:parquet/none/none # TODO: Support Avro. Data loading currently fails for Avro because complex types # cannot be converted to the corresponding Avro types yet. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76b67485/testdata/workloads/functional-query/queries/QueryTest/parquet.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet.test b/testdata/workloads/functional-query/queries/QueryTest/parquet.test index 479e878..e6b4061 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet.test @@ -53,7 +53,7 @@ SELECT * from bad_magic_number File '$NAMENODE/test-warehouse/bad_magic_number_parquet/bad_magic_number.parquet' has an invalid version number: XXXX ==== ---- QUERY -# count(*) query on parquet file with multiple blocks +# count(*) query on parquet file with multiple blocks (one block per node) SELECT count(*) from lineitem_multiblock ---- TYPES bigint @@ -61,7 +61,15 @@ bigint 20000 ==== ---- QUERY -# Select multiple columns from parquet file with multiple blocks +# count(*) query on parquet file with more than one block per node +SELECT count(*) from lineitem_sixblocks +---- TYPES +bigint +---- RESULTS +40000 +==== +---- QUERY +# Select multiple columns from parquet file with multiple blocks (one block per node) SELECT count(l_comment), min(l_partkey), max(l_linenumber) from lineitem_multiblock; ---- TYPES bigint, bigint, int @@ -69,8 +77,44 @@ bigint, bigint, int 20000,2,7 ==== ---- QUERY -# Test limit queries on parquet with multiple blocks -select distinct l_orderkey from functional_parquet.lineitem_multiblock where +# Select multiple columns from parquet file with more than one block per node +SELECT count(l_comment), min(l_partkey), max(l_linenumber) from lineitem_sixblocks; +---- TYPES +bigint, bigint, int +---- RESULTS +40000,2,7 +==== +---- QUERY +# Test limit queries on parquet with multiple blocks (one block per node) +select distinct l_orderkey from lineitem_multiblock where +l_orderkey < 5 or l_orderkey > 15000 order by l_orderkey limit 20; +---- TYPES +bigint +---- RESULTS +1 +2 +3 +4 +15008 +15009 +15010 +15011 +15012 +15013 +15014 +15015 +15040 +15041 +15042 +15043 +15044 +15045 +15046 +15047 +==== +---- QUERY +# Test limit queries on parquet with more than one block per node +select distinct l_orderkey from lineitem_sixblocks where l_orderkey < 5 or l_orderkey > 15000 order by l_orderkey limit 20; ---- TYPES bigint http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76b67485/tests/query_test/test_scanners.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index ca34965..7a4290f 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -215,32 +215,69 @@ class TestParquet(ImpalaTestSuite): @pytest.mark.execute_serially def test_multiple_blocks(self, vector): # For IMPALA-1881. The table functional_parquet.lineitem_multiblock has 3 blocks, so - # we verify if each impalad reads one block by checking if each impalad reads at - # least one row group. + # each impalad should read 1 scan range. # It needs to execute serially because if there is at a time more, than one query # being scheduled, the simple scheduler round robins colocated impalads across # all running queries. See IMPALA-2479 for more details. - DB_NAME = 'functional_parquet' - TABLE_NAME = 'lineitem_multiblock' - query = 'select count(l_orderkey) from %s.%s' % (DB_NAME, TABLE_NAME) + table_name = 'functional_parquet.lineitem_multiblock' + self._multiple_blocks_helper(table_name, 20000, ranges_per_node=1) + table_name = 'functional_parquet.lineitem_sixblocks' + # 2 scan ranges per node should be created to read 'lineitem_sixblocks' because + # there are 6 blocks and 3 scan nodes. + self._multiple_blocks_helper(table_name, 40000, ranges_per_node=2) + + @SkipIfS3.hdfs_block_size + @SkipIfIsilon.hdfs_block_size + @SkipIfLocal.multiple_impalad + @pytest.mark.execute_serially + def test_multiple_blocks_one_row_group(self, vector): + # For IMPALA-1881. The table functional_parquet.lineitem_multiblock_one_row_group has + # 3 blocks but only one row group across these blocks. We test to see that only one + # scan range reads everything from this row group. + table_name = 'functional_parquet.lineitem_multiblock_one_row_group' + self._multiple_blocks_helper( + table_name, 40000, one_row_group=True, ranges_per_node=1) + + def _multiple_blocks_helper( + self, table_name, rows_in_table, one_row_group=False, ranges_per_node=1): + """ This function executes a simple SELECT query on a multiblock parquet table and + verifies the number of ranges issued per node and verifies that at least one row group + was read. If 'one_row_group' is True, then one scan range is expected to read the data + from the entire table regardless of the number of blocks. 'ranges_per_node' indicates + how many scan ranges we expect to be issued per node. """ + + query = 'select count(l_orderkey) from %s' % table_name result = self.client.execute(query) assert len(result.data) == 1 - assert result.data[0] == '20000' + assert result.data[0] == str(rows_in_table) runtime_profile = str(result.runtime_profile) num_row_groups_list = re.findall('NumRowGroups: ([0-9]*)', runtime_profile) - scan_ranges_complete_list = re.findall('ScanRangesComplete: ([0-9]*)', runtime_profile) + scan_ranges_complete_list = re.findall( + 'ScanRangesComplete: ([0-9]*)', runtime_profile) + num_rows_read_list = re.findall('RowsRead: [0-9.K]* \(([0-9]*)\)', runtime_profile) + # This will fail if the number of impalads != 3 # The fourth fragment is the "Averaged Fragment" assert len(num_row_groups_list) == 4 assert len(scan_ranges_complete_list) == 4 + assert len(num_rows_read_list) == 4 + total_num_row_groups = 0 # Skip the Averaged Fragment; it comes first in the runtime profile. for num_row_groups in num_row_groups_list[1:]: - assert int(num_row_groups) > 0 - - for scan_ranges_complete in scan_ranges_complete_list[1:]: - assert int(scan_ranges_complete) == 1 + total_num_row_groups += int(num_row_groups) + if not one_row_group: assert int(num_row_groups) > 0 + + if one_row_group: + # If it's the one row group test, only one scan range should read all the data from + # that row group. + assert total_num_row_groups == 1 + for rows_read in num_rows_read_list[1:]: + if rows_read != '0': assert rows_read == str(rows_in_table) + + for scan_ranges_complete in scan_ranges_complete_list: + assert int(scan_ranges_complete) == ranges_per_node @SkipIfS3.insert def test_annotate_utf8_option(self, vector, unique_database):
