http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/92703468/tests/query_test/test_insert_parquet.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py index 1472bb1..4477786 100644 --- a/tests/query_test/test_insert_parquet.py +++ b/tests/query_test/test_insert_parquet.py @@ -20,10 +20,12 @@ import os from collections import namedtuple +from datetime import datetime +from decimal import Decimal from shutil import rmtree from subprocess import check_call from tempfile import mkdtemp as make_tmp_dir -from parquet.ttypes import SortingColumn +from parquet.ttypes import ColumnOrder, SortingColumn, TypeDefinedOrder from tests.common.environ import impalad_basedir from tests.common.impala_test_suite import ImpalaTestSuite @@ -50,6 +52,21 @@ class RoundFloat(): """Compares this objects's value to a numeral after rounding it.""" return round(self.value, self.num_digits) == round(numeral, self.num_digits) + +class TimeStamp(): + """Class to construct timestamps with a default format specifier.""" + def __init__(self, value): + # This member must be called 'timetuple'. Only if this class has a member called + # 'timetuple' will the datetime __eq__ function forward an unknown equality check to + # this method by returning NotImplemented: + # https://docs.python.org/2/library/datetime.html#datetime.datetime + self.timetuple = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f') + + def __eq__(self, other_timetuple): + """Compares this objects's value to another timetuple.""" + return self.timetuple == other_timetuple + + ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max']) # Test a smaller parquet file size as well @@ -249,6 +266,34 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite): for row_group in row_groups: assert row_group.sorting_columns == expected + def test_set_column_orders(self, vector, unique_database, tmpdir): + """Tests that the Parquet writers set FileMetaData::column_orders.""" + source_table = "functional_parquet.alltypessmall" + target_table = "test_set_column_orders" + qualified_target_table = "{0}.{1}".format(unique_database, target_table) + hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(unique_database, + target_table)) + + # Create table + query = "create table {0} like {1} stored as parquet".format(qualified_target_table, + source_table) + self.execute_query(query) + + # Insert data + query = ("insert into {0} partition(year, month) select * from {1}").format( + qualified_target_table, source_table) + self.execute_query(query) + + # Download hdfs files and verify column orders + check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath]) + + expected_col_orders = [ColumnOrder(TYPE_ORDER=TypeDefinedOrder())] * 11 + + for root, subdirs, files in os.walk(tmpdir.strpath): + for f in files: + parquet_file = os.path.join(root, str(f)) + file_meta_data = get_parquet_metadata(parquet_file) + assert file_meta_data.column_orders == expected_col_orders @SkipIfIsilon.hive @SkipIfLocal.hive @@ -275,13 +320,13 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): decoded.append(None) continue - if stats.min is None and stats.max is None: + if stats.min_value is None and stats.max_value is None: decoded.append(None) continue - assert stats.min is not None and stats.max is not None - min_value = decode_stats_value(schema, stats.min) - max_value = decode_stats_value(schema, stats.max) + assert stats.min_value is not None and stats.max_value is not None + min_value = decode_stats_value(schema, stats.min_value) + max_value = decode_stats_value(schema, stats.max_value) decoded.append(ColumnStats(schema.name, min_value, max_value)) assert len(decoded) == len(schemas) @@ -347,32 +392,21 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): assert stats == expected def _ctas_table_and_verify_stats(self, vector, unique_database, source_table, - expected_values, hive_skip_col_idx = None): + expected_values): """Copies 'source_table' into a parquet table and makes sure that the row group - statistics in the resulting parquet file match those in 'expected_values'. The - comparison is performed against both Hive and Impala. For Hive, columns indexed by - 'hive_skip_col_idx' are excluded from the verification of the expected values. + statistics in the resulting parquet file match those in 'expected_values'. """ table_name = "test_hdfs_parquet_table_writer" qualified_table_name = "{0}.{1}".format(unique_database, table_name) hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database, table_name)) - # Validate against Hive. + # Setting num_nodes = 1 ensures that the query is executed on the coordinator, + # resulting in a single parquet file being written. self.execute_query("drop table if exists {0}".format(qualified_table_name)) - self.run_stmt_in_hive("create table {0} stored as parquet as select * from " - "{1}".format(qualified_table_name, source_table)) - self.execute_query("invalidate metadata {0}".format(qualified_table_name)) - self._validate_min_max_stats(hdfs_path, expected_values, hive_skip_col_idx) - - # Validate against Impala. Setting exec_single_node_rows_threshold and adding a limit - # clause ensures that the query is executed on the coordinator, resulting in a single - # parquet file being written. - num_rows = self.execute_scalar("select count(*) from {0}".format(source_table)) - self.execute_query("drop table {0}".format(qualified_table_name)) - query = ("create table {0} stored as parquet as select * from {1} limit " - "{2}").format(qualified_table_name, source_table, num_rows) - vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows + query = ("create table {0} stored as parquet as select * from {1}").format( + qualified_table_name, source_table) + vector.get_value('exec_option')['num_nodes'] = 1 self.execute_query(query, vector.get_value('exec_option')) self._validate_min_max_stats(hdfs_path, expected_values) @@ -390,29 +424,33 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): ColumnStats('bigint_col', 0, 90), ColumnStats('float_col', 0, RoundFloat(9.9, 1)), ColumnStats('double_col', 0, RoundFloat(90.9, 1)), - None, - None, - None, + ColumnStats('date_string_col', '01/01/09', '12/31/10'), + ColumnStats('string_col', '0', '9'), + ColumnStats('timestamp_col', TimeStamp('2009-01-01 00:00:00.0'), + TimeStamp('2010-12-31 05:09:13.860000')), ColumnStats('year', 2009, 2010), ColumnStats('month', 1, 12), ] - # Skip comparison of unsupported columns types with Hive. - hive_skip_col_idx = [8, 9, 10] - self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes", - expected_min_max_values, hive_skip_col_idx) + expected_min_max_values) def test_write_statistics_decimal(self, vector, unique_database): - """Test that Impala does not write statistics for decimal columns.""" + """Test that writing a parquet file populates the rowgroup statistics with the correct + values for decimal columns. + """ # Expected values for functional.decimal_tbl - expected_min_max_values = [None, None, None, None, None, None] - - # Skip comparison of unsupported columns types with Hive. - hive_skip_col_idx = range(len(expected_min_max_values)) + expected_min_max_values = [ + ColumnStats('d1', 1234, 132842), + ColumnStats('d2', 111, 2222), + ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789')), + ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789')), + ColumnStats('d5', Decimal('0.1'), Decimal('12345.789')), + ColumnStats('d6', 1, 1) + ] self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl", - expected_min_max_values, hive_skip_col_idx) + expected_min_max_values) def test_write_statistics_multi_page(self, vector, unique_database): """Test that writing a parquet file populates the rowgroup statistics with the correct @@ -421,40 +459,57 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): # Expected values for tpch_parquet.customer expected_min_max_values = [ ColumnStats('c_custkey', 1, 150000), - None, - None, + ColumnStats('c_name', 'Customer#000000001', 'Customer#000150000'), + ColumnStats('c_address', ' 2uZwVhQvwA', 'zzxGktzXTMKS1BxZlgQ9nqQ'), ColumnStats('c_nationkey', 0, 24), - None, - None, - None, - None, + ColumnStats('c_phone', '10-100-106-1617', '34-999-618-6881'), + ColumnStats('c_acctbal', Decimal('-999.99'), Decimal('9999.99')), + ColumnStats('c_mktsegment', 'AUTOMOBILE', 'MACHINERY'), + ColumnStats('c_comment', ' Tiresias according to the slyly blithe instructions ' + 'detect quickly at the slyly express courts. express dinos wake ', + 'zzle. blithely regular instructions cajol'), ] - # Skip comparison of unsupported columns types with Hive. - hive_skip_col_idx = [1, 2, 4, 5, 6, 7] - self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer", - expected_min_max_values, hive_skip_col_idx) + expected_min_max_values) def test_write_statistics_null(self, vector, unique_database): """Test that we don't write min/max statistics for null columns.""" - expected_min_max_values = [None, None, None, None, None, None, None] - - # Skip comparison of unsupported columns types with Hive. - hive_skip_col_idx = range(len(expected_min_max_values)) + expected_min_max_values = [ + ColumnStats('a', 'a', 'a'), + ColumnStats('b', '', ''), + None, + None, + None, + ColumnStats('f', 'a\x00b', 'a\x00b'), + ColumnStats('g', '\x00', '\x00') + ] self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable", - expected_min_max_values, hive_skip_col_idx) + expected_min_max_values) def test_write_statistics_char_types(self, vector, unique_database): - """Test that Impala does not write statistics for char columns.""" - expected_min_max_values = [None, None, None] + """Test that Impala correctly writes statistics for char columns.""" + table_name = "test_char_types" + qualified_table_name = "{0}.{1}".format(unique_database, table_name) - # Skip comparison of unsupported columns types with Hive. - hive_skip_col_idx = range(len(expected_min_max_values)) + create_table_stmt = "create table {0} (c3 char(3), vc varchar, st string);".format( + qualified_table_name) + self.execute_query(create_table_stmt) - self._ctas_table_and_verify_stats(vector, unique_database, "functional.chars_formats", - expected_min_max_values, hive_skip_col_idx) + insert_stmt = """insert into {0} values + (cast("def" as char(3)), "ghj xyz", "abc xyz"), + (cast("abc" as char(3)), "def 123 xyz", "lorem ipsum"), + (cast("xy" as char(3)), "abc banana", "dolor dis amet")""".format(qualified_table_name) + self.execute_query(insert_stmt) + expected_min_max_values = [ + ColumnStats('c3', 'abc', 'xy'), + ColumnStats('vc', 'abc banana', 'ghj xyz'), + ColumnStats('st', 'abc xyz', 'lorem ipsum') + ] + + self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name, + expected_min_max_values) def test_write_statistics_negative(self, vector, unique_database): """Test that Impala correctly writes statistics for negative values.""" @@ -496,14 +551,13 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): # Insert a large amount of data on a single backend with a limited parquet file size. # This will result in several files being written, exercising code that tracks # statistics for row groups. - num_rows = self.execute_scalar("select count(*) from {0}".format(source_table)) query = "create table {0} like {1} stored as parquet".format(qualified_target_table, source_table) self.execute_query(query, vector.get_value('exec_option')) - query = ("insert into {0} /* +sortby(o_orderkey) */ select * from {1} limit" - "{2}").format(qualified_target_table, source_table, num_rows) - vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows - vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = 8 * 1024 * 1024 + query = ("insert into {0} /* +sortby(o_orderkey) */ select * from {1}").format( + qualified_target_table, source_table) + vector.get_value('exec_option')['num_nodes'] = 1 + vector.get_value('exec_option')['parquet_file_size'] = 8 * 1024 * 1024 self.execute_query(query, vector.get_value('exec_option')) # Get all stats for the o_orderkey column
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/92703468/tests/query_test/test_parquet_stats.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py index 9b9d6d7..93fc06d 100644 --- a/tests/query_test/test_parquet_stats.py +++ b/tests/query_test/test_parquet_stats.py @@ -15,10 +15,14 @@ # specific language governing permissions and limitations # under the License. +import os import pytest +import shlex +from subprocess import check_call from tests.common.test_vector import ImpalaTestDimension from tests.common.impala_test_suite import ImpalaTestSuite +from tests.util.filesystem_utils import get_fs_path MT_DOP_VALUES = [0, 1, 2, 8] @@ -43,3 +47,26 @@ class TestParquetStats(ImpalaTestSuite): # skipped inside a fragment, so we ensure that the tests run in a single fragment. vector.get_value('exec_option')['num_nodes'] = 1 self.run_test_case('QueryTest/parquet_stats', vector, use_db=unique_database) + + def test_deprecated_stats(self, vector, unique_database): + """Test that reading parquet files with statistics with deprecated 'min'/'max' fields + works correctly. The statistics will be used for known-good types (boolean, integral, + float) and will be ignored for all other types (string, decimal, timestamp).""" + table_name = 'deprecated_stats' + # We use CTAS instead of "create table like" to convert the partition columns into + # normal table columns. + self.client.execute('create table %s.%s stored as parquet as select * from ' + 'functional.alltypessmall limit 0' % + (unique_database, table_name)) + table_location = get_fs_path('/test-warehouse/%s.db/%s' % + (unique_database, table_name)) + local_file = os.path.join(os.environ['IMPALA_HOME'], + 'testdata/data/deprecated_statistics.parquet') + assert os.path.isfile(local_file) + check_call(['hdfs', 'dfs', '-copyFromLocal', local_file, table_location]) + self.client.execute('invalidate metadata %s.%s' % (unique_database, table_name)) + # The test makes assumptions about the number of row groups that are processed and + # skipped inside a fragment, so we ensure that the tests run in a single fragment. + vector.get_value('exec_option')['num_nodes'] = 1 + self.run_test_case('QueryTest/parquet-deprecated-stats', vector, unique_database) +
