IMPALA-6709: Simplify tests that copy local files to tables

We had quite a few tests that created a table and used
"hdfs dfs -copyFromLocal" to copy data files to the
warehouse directory for this table.

This operation needs some boilerplate code that I
refactored to the new functions called
create_table_from_parquet() and
create_table_and_copy_files().

Change-Id: Ie00a4561825facf8abe2e8e74a6b6e93194f416f
Reviewed-on: http://gerrit.cloudera.org:8080/11127
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e27954a5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e27954a5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e27954a5

Branch: refs/heads/master
Commit: e27954a5aa585db23fe3c97726aa89305efa306d
Parents: da01f29
Author: Zoltan Borok-Nagy <[email protected]>
Authored: Thu Aug 2 15:13:04 2018 +0200
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Aug 22 18:08:20 2018 +0000

----------------------------------------------------------------------
 .../queries/QueryTest/parquet-def-levels.test   |   8 +-
 tests/common/file_utils.py                      |  58 ++++++
 tests/query_test/test_parquet_stats.py          |  32 ++--
 tests/query_test/test_scanners.py               | 178 +++++--------------
 4 files changed, 115 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e27954a5/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
index e55fc4d..0145fca 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
@@ -54,14 +54,14 @@ 
INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT
 ---- QUERY
 # IMPALA-6077: unsupported BIT_PACKED encoding fails when materializing 
columns.
 select id
-from alltypesagg_bitpacked
+from alltypes_agg_bitpacked_def_levels
 ---- CATCH
 deprecated BIT_PACKED encoding for rep or def levels.
 ====
 ---- QUERY
 # IMPALA-6077: do not need to decode BIT_PACKED encoding when not 
materializing columns.
 select count(*)
-from alltypesagg_bitpacked
+from alltypes_agg_bitpacked_def_levels
 ---- RESULTS
 11000
 ---- TYPES
@@ -72,7 +72,7 @@ BIGINT
 # case it should either work or fail gracefully. For now it still requires 
materialising
 # levels.
 select count(id)
-from alltypesagg_bitpacked
+from alltypes_agg_bitpacked_def_levels
 ---- CATCH
 deprecated BIT_PACKED encoding for rep or def levels.
 ====
@@ -81,7 +81,7 @@ deprecated BIT_PACKED encoding for rep or def levels.
 # case it should either work or fail gracefully. For now it still requires 
materialising
 # levels.
 select min(int_col)
-from alltypesagg_bitpacked
+from alltypes_agg_bitpacked_def_levels
 ---- CATCH
 deprecated BIT_PACKED encoding for rep or def levels.
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/e27954a5/tests/common/file_utils.py
----------------------------------------------------------------------
diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py
new file mode 100644
index 0000000..d3fa61c
--- /dev/null
+++ b/tests/common/file_utils.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This module contains utility functions for testing Parquet files
+
+import os
+from subprocess import check_call
+
+from tests.util.filesystem_utils import get_fs_path
+
+
+def create_table_from_parquet(impala_client, unique_database, table_name):
+  """Utility function to create a database table from a Parquet file. A 
Parquet file must
+  exist in $IMPALA_HOME/testdata/data with the name 'table_name'.parquet"""
+  filename = '{0}.parquet'.format(table_name)
+  local_file = os.path.join(os.environ['IMPALA_HOME'],
+                            'testdata/data/{0}'.format(filename))
+  assert os.path.isfile(local_file)
+  hdfs_file = get_fs_path('/test-warehouse/{0}.db/{1}'.format(unique_database, 
filename))
+  check_call(['hdfs', 'dfs', '-copyFromLocal', '-f', local_file, hdfs_file])
+
+  qualified_table_name = '{0}.{1}'.format(unique_database, table_name)
+  impala_client.execute('create table {0} like parquet "{1}" stored as 
parquet'.format(
+    qualified_table_name, hdfs_file))
+  impala_client.execute('load data inpath "{0}" into table {1}'.format(
+    hdfs_file, qualified_table_name))
+
+
+def create_table_and_copy_files(impala_client, create_stmt, unique_database, 
table_name,
+                                files):
+  create_stmt = create_stmt.format(db=unique_database, tbl=table_name)
+  impala_client.execute(create_stmt)
+  for local_file in files:
+    # Cut off leading '/' to make os.path.join() happy
+    local_file = local_file if local_file[0] != '/' else local_file[1:]
+    local_file = os.path.join(os.environ['IMPALA_HOME'], local_file)
+    assert os.path.isfile(local_file)
+    basename = os.path.basename(local_file)
+    hdfs_file = 
get_fs_path('/test-warehouse/{0}.db/{1}'.format(unique_database,
+                                                                basename))
+    check_call(['hdfs', 'dfs', '-copyFromLocal', '-f', local_file, hdfs_file])
+    qualified_table_name = '{0}.{1}'.format(unique_database, table_name)
+    impala_client.execute('load data inpath "{0}" into table {1}'.format(
+      hdfs_file, qualified_table_name))

http://git-wip-us.apache.org/repos/asf/impala/blob/e27954a5/tests/query_test/test_parquet_stats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_stats.py 
b/tests/query_test/test_parquet_stats.py
index 3f8cd2f..cb35653 100644
--- a/tests/query_test/test_parquet_stats.py
+++ b/tests/query_test/test_parquet_stats.py
@@ -20,6 +20,8 @@ import pytest
 import shlex
 from subprocess import check_call
 
+from tests.common.file_utils import (
+  create_table_from_parquet, create_table_and_copy_files)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.util.filesystem_utils import get_fs_path
@@ -52,19 +54,14 @@ class TestParquetStats(ImpalaTestSuite):
     """Test that reading parquet files with statistics with deprecated 
'min'/'max' fields
     works correctly. The statistics will be used for known-good types 
(boolean, integral,
     float) and will be ignored for all other types (string, decimal, 
timestamp)."""
-    table_name = 'deprecated_stats'
+
     # We use CTAS instead of "create table like" to convert the partition 
columns into
     # normal table columns.
-    self.client.execute('create table %s.%s stored as parquet as select * from 
'
-                        'functional.alltypessmall limit 0' %
-                        (unique_database, table_name))
-    table_location = get_fs_path('/test-warehouse/%s.db/%s' %
-                                 (unique_database, table_name))
-    local_file = os.path.join(os.environ['IMPALA_HOME'],
-                              'testdata/data/deprecated_statistics.parquet')
-    assert os.path.isfile(local_file)
-    check_call(['hdfs', 'dfs', '-copyFromLocal', local_file, table_location])
-    self.client.execute('invalidate metadata %s.%s' % (unique_database, 
table_name))
+    create_table_and_copy_files(self.client, 'create table {db}.{tbl} stored 
as parquet '
+                                             'as select * from 
functional.alltypessmall '
+                                             'limit 0',
+                                unique_database, 'deprecated_stats',
+                                
['testdata/data/deprecated_statistics.parquet'])
     # The test makes assumptions about the number of row groups that are 
processed and
     # skipped inside a fragment, so we ensure that the tests run in a single 
fragment.
     vector.get_value('exec_option')['num_nodes'] = 1
@@ -74,14 +71,5 @@ class TestParquetStats(ImpalaTestSuite):
     """IMPALA-6538" Test that reading parquet files with statistics with 
invalid
     'min_value'/'max_value' fields works correctly. 'min_value' and 
'max_value' are both
     NaNs, therefore we need to ignore them"""
-    table_name = 'min_max_is_nan'
-    self.client.execute('create table %s.%s (val double) stored as parquet' %
-                       (unique_database, table_name))
-    table_location = get_fs_path('/test-warehouse/%s.db/%s' %
-                                 (unique_database, table_name))
-    local_file = os.path.join(os.environ['IMPALA_HOME'],
-                              'testdata/data/min_max_is_nan.parquet')
-    assert os.path.isfile(local_file)
-    check_call(['hdfs', 'dfs', '-copyFromLocal', local_file, table_location])
-    self.client.execute('invalidate metadata %s.%s' % (unique_database, 
table_name))
-    self.run_test_case('QueryTest/parquet-invalid-minmax-stats', vector, 
unique_database)
\ No newline at end of file
+    create_table_from_parquet(self.client, unique_database, 'min_max_is_nan')
+    self.run_test_case('QueryTest/parquet-invalid-minmax-stats', vector, 
unique_database)

http://git-wip-us.apache.org/repos/asf/impala/blob/e27954a5/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 2e61d20..1cd883e 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -44,6 +44,9 @@ from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_exec_option_dimension,
     create_uncompressed_text_dimension)
+from tests.common.file_utils import (
+    create_table_from_parquet,
+    create_table_and_copy_files)
 from tests.common.test_result_verifier import (
     parse_column_types,
     parse_column_labels,
@@ -290,21 +293,6 @@ class TestParquet(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_constraint(
       lambda v: v.get_value('table_format').file_format == 'parquet')
 
-  def _create_table_from_file(self, table_name, unique_database):
-    filename = '%s.parquet' % table_name
-    local_file = os.path.join(os.environ['IMPALA_HOME'],
-                              'testdata/data/%s' % filename)
-    assert os.path.isfile(local_file)
-    hdfs_file = get_fs_path('/test-warehouse/{0}.db/{1}'.format(
-        unique_database, filename))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', '-f', local_file, hdfs_file])
-
-    qualified_table_name = '%s.%s' % (unique_database, table_name)
-    self.client.execute('create table %s like parquet "%s" stored as parquet' %
-                        (qualified_table_name, hdfs_file))
-    self.client.execute('load data inpath "%s" into table %s' %
-                        (hdfs_file, qualified_table_name))
-
   def test_parquet(self, vector):
     self.run_test_case('QueryTest/parquet', vector)
 
@@ -316,13 +304,7 @@ class TestParquet(ImpalaTestSuite):
 
   def test_timestamp_out_of_range(self, vector, unique_database):
     """IMPALA-4363: Test scanning parquet files with an out of range 
timestamp."""
-    self.client.execute(("create table {0}.out_of_range_timestamp (ts 
timestamp) "
-        "stored as parquet").format(unique_database))
-    out_of_range_timestamp_loc = get_fs_path(
-        "/test-warehouse/{0}.db/{1}".format(unique_database, 
"out_of_range_timestamp"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + 
"/testdata/data/out_of_range_timestamp.parquet",
-        out_of_range_timestamp_loc])
+    create_table_from_parquet(self.client, unique_database, 
"out_of_range_timestamp")
 
     vector.get_value('exec_option')['abort_on_error'] = 0
     self.run_test_case('QueryTest/out-of-range-timestamp-continue-on-error',
@@ -335,21 +317,9 @@ class TestParquet(ImpalaTestSuite):
     """IMPALA-3943: Tests that scanning files with num_rows=0 in the file 
footer
     succeeds without errors."""
     # Create test table with a file that has 0 rows and 0 row groups.
-    self.client.execute("create table %s.zero_rows_zero_row_groups (c int) "
-        "stored as parquet" % unique_database)
-    zero_rows_zero_row_groups_loc = get_fs_path(
-        "/test-warehouse/%s.db/%s" % (unique_database, 
"zero_rows_zero_row_groups"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + 
"/testdata/data/zero_rows_zero_row_groups.parquet",
-        zero_rows_zero_row_groups_loc])
+    create_table_from_parquet(self.client, unique_database, 
"zero_rows_zero_row_groups")
     # Create test table with a file that has 0 rows and 1 row group.
-    self.client.execute("create table %s.zero_rows_one_row_group (c int) "
-        "stored as parquet" % unique_database)
-    zero_rows_one_row_group_loc = get_fs_path(
-        "/test-warehouse/%s.db/%s" % (unique_database, 
"zero_rows_one_row_group"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + 
"/testdata/data/zero_rows_one_row_group.parquet",
-        zero_rows_one_row_group_loc])
+    create_table_from_parquet(self.client, unique_database, 
"zero_rows_one_row_group")
 
     vector.get_value('exec_option')['abort_on_error'] = 0
     self.run_test_case('QueryTest/parquet-zero-rows', vector, unique_database)
@@ -359,13 +329,7 @@ class TestParquet(ImpalaTestSuite):
   def test_repeated_root_schema(self, vector, unique_database):
     """IMPALA-4826: Tests that running a scan on a schema where the root 
schema's
        repetetion level is set to REPEATED succeeds without errors."""
-    self.client.execute("create table %s.repeated_root_schema (i int) "
-        "stored as parquet" % unique_database)
-    repeated_root_schema_loc = get_fs_path(
-        "/test-warehouse/%s.db/%s" % (unique_database, "repeated_root_schema"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + 
"/testdata/data/repeated_root_schema.parquet",
-        repeated_root_schema_loc])
+    create_table_from_parquet(self.client, unique_database, 
"repeated_root_schema")
 
     result = self.client.execute("select * from %s.repeated_root_schema" % 
unique_database)
     assert len(result.data) == 300
@@ -373,13 +337,7 @@ class TestParquet(ImpalaTestSuite):
   def test_huge_num_rows(self, vector, unique_database):
     """IMPALA-5021: Tests that a zero-slot scan on a file with a huge num_rows 
in the
     footer succeeds without errors."""
-    self.client.execute("create table %s.huge_num_rows (i int) stored as 
parquet"
-      % unique_database)
-    huge_num_rows_loc = get_fs_path(
-        "/test-warehouse/%s.db/%s" % (unique_database, "huge_num_rows"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + "/testdata/data/huge_num_rows.parquet",
-        huge_num_rows_loc])
+    create_table_from_parquet(self.client, unique_database, "huge_num_rows")
     result = self.client.execute("select count(*) from %s.huge_num_rows"
       % unique_database)
     assert len(result.data) == 1
@@ -407,16 +365,12 @@ class TestParquet(ImpalaTestSuite):
     check_call(['hive', '-e', hql_format.format(codec="snappy", year=2010, 
month=1)])
     check_call(['hive', '-e', hql_format.format(codec="gzip", year=2010, 
month=2)])
 
-    self.client.execute("create table %s.multi_compression (a string, b 
string)"
-        " stored as parquet" % unique_database)
-    multi_compression_tbl_loc =\
-        get_fs_path("/test-warehouse/%s.db/%s" % (unique_database, 
"multi_compression"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        
"/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq",
-        multi_compression_tbl_loc])
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        
"/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq",
-        multi_compression_tbl_loc])
+    test_files = 
["testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq",
+                  
"testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq"]
+    create_table_and_copy_files(self.client, "create table {db}.{tbl} "
+                                             "(a string, b string) stored as 
parquet",
+                                unique_database, "multi_compression",
+                                test_files)
 
     vector.get_value('exec_option')['num_nodes'] = 1
     self.run_test_case('QueryTest/hdfs_parquet_scan_node_profile',
@@ -429,16 +383,11 @@ class TestParquet(ImpalaTestSuite):
     - incorrect repeat count of 0 for the RLE encoded dictionary indexes
     """
     # Create test table and copy the corrupt files into it.
-    self.client.execute(
-        "create table %s.bad_rle_counts (c bigint) stored as parquet" % 
unique_database)
-    bad_rle_counts_tbl_loc =\
-        get_fs_path("/test-warehouse/%s.db/%s" % (unique_database, 
"bad_rle_counts"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + 
"/testdata/data/bad_rle_literal_count.parquet",
-        bad_rle_counts_tbl_loc])
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + 
"/testdata/data/bad_rle_repeat_count.parquet",
-        bad_rle_counts_tbl_loc])
+    test_files = ["testdata/data/bad_rle_literal_count.parquet",
+                  "testdata/data/bad_rle_repeat_count.parquet"]
+    create_table_and_copy_files(self.client,
+                                "create table {db}.{tbl} (c bigint) stored as 
parquet",
+                                unique_database, "bad_rle_counts", test_files)
     # Querying the corrupted files should not DCHECK or crash.
     vector.get_value('exec_option')['abort_on_error'] = 0
     self.run_test_case('QueryTest/parquet-corrupt-rle-counts', vector, 
unique_database)
@@ -449,53 +398,34 @@ class TestParquet(ImpalaTestSuite):
   def test_bad_compressed_page_size(self, vector, unique_database):
     """IMPALA-6353: Tests that a parquet dict page with 0 compressed_page_size 
is
     gracefully handled. """
-    self.client.execute(
-        "create table %s.bad_compressed_dict_page_size (col string) stored as 
parquet"
-        % unique_database)
-    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
-        "bad_compressed_dict_page_size"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/bad_compressed_dict_page_size.parquet", tbl_loc])
+    create_table_from_parquet(self.client, unique_database,
+                              "bad_compressed_dict_page_size")
     self.run_test_case('QueryTest/parquet-bad-compressed-dict-page-size', 
vector,
         unique_database)
 
   def test_def_levels(self, vector, unique_database):
     """Test that Impala behaves as expected when decoding def levels with 
different
        encodings - RLE, BIT_PACKED, etc."""
-    self.client.execute(("""CREATE TABLE {0}.alltypesagg_bitpacked (
-          id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT,
-          int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE,
-          date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP,
-          year INT, month INT, day INT) STORED AS 
PARQUET""").format(unique_database))
-    alltypesagg_loc = get_fs_path(
-        "/test-warehouse/{0}.db/{1}".format(unique_database, 
"alltypesagg_bitpacked"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/alltypes_agg_bitpacked_def_levels.parquet", 
alltypesagg_loc])
-    self.client.execute("refresh 
{0}.alltypesagg_bitpacked".format(unique_database));
-
+    create_table_from_parquet(self.client, unique_database,
+                              "alltypes_agg_bitpacked_def_levels")
     self.run_test_case('QueryTest/parquet-def-levels', vector, unique_database)
 
   def test_bad_compression_codec(self, vector, unique_database):
     """IMPALA-6593: test the bad compression codec is handled gracefully. """
-    self.client.execute(("""CREATE TABLE {0}.bad_codec (
+    test_files = ["testdata/data/bad_codec.parquet"]
+    create_table_and_copy_files(self.client, """CREATE TABLE {db}.{tbl} (
           id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT,
           int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE,
           date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP,
-          year INT, month INT) STORED AS PARQUET""").format(unique_database))
-    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
-        "bad_codec"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/bad_codec.parquet", tbl_loc])
+          year INT, month INT) STORED AS PARQUET""",
+                                unique_database, "bad_codec",
+                                test_files)
     self.run_test_case('QueryTest/parquet-bad-codec', vector, unique_database)
 
   def test_num_values_def_levels_mismatch(self, vector, unique_database):
     """IMPALA-6589: test the bad num_values handled correctly. """
-    self.client.execute(("""CREATE TABLE {0}.num_values_def_levels_mismatch 
(_c0 BOOLEAN)
-        STORED AS PARQUET""").format(unique_database))
-    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
-        "num_values_def_levels_mismatch"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/num_values_def_levels_mismatch.parquet", tbl_loc])
+    create_table_from_parquet(self.client, unique_database,
+                              "num_values_def_levels_mismatch")
     self.run_test_case('QueryTest/parquet-num-values-def-levels-mismatch',
         vector, unique_database)
 
@@ -706,33 +636,20 @@ class TestParquet(ImpalaTestSuite):
   def test_decimal_encodings(self, vector, unique_database):
     # Create a table using an existing data file with dictionary-encoded, 
variable-length
     # physical encodings for decimals.
-    TABLE_NAME = "decimal_encodings"
-    self.client.execute('''create table if not exists %s.%s
-    (small_dec decimal(9,2), med_dec decimal(18,2), large_dec decimal(38,2))
-    STORED AS PARQUET''' % (unique_database, TABLE_NAME))
-
-    table_loc = get_fs_path(
-      "/test-warehouse/%s.db/%s" % (unique_database, TABLE_NAME))
-    for file_name in ["binary_decimal_dictionary.parquet",
-                      "binary_decimal_no_dictionary.parquet"]:
-      data_file_path = os.path.join(os.environ['IMPALA_HOME'],
-                                    "testdata/data/", file_name)
-      check_call(['hdfs', 'dfs', '-copyFromLocal', data_file_path, table_loc])
-
-    self._create_table_from_file('decimal_stored_as_int32', unique_database)
-    self._create_table_from_file('decimal_stored_as_int64', unique_database)
+    test_files = ["testdata/data/binary_decimal_dictionary.parquet",
+                  "testdata/data/binary_decimal_no_dictionary.parquet"]
+    create_table_and_copy_files(self.client, """create table if not exists 
{db}.{tbl}
+        (small_dec decimal(9,2), med_dec decimal(18,2), large_dec 
decimal(38,2))
+         STORED AS PARQUET""", unique_database, "decimal_encodings", 
test_files)
+
+    create_table_from_parquet(self.client, unique_database, 
'decimal_stored_as_int32')
+    create_table_from_parquet(self.client, unique_database, 
'decimal_stored_as_int64')
 
     self.run_test_case('QueryTest/parquet-decimal-formats', vector, 
unique_database)
 
   def test_rle_encoded_bools(self, vector, unique_database):
     """IMPALA-6324: Test that Impala decodes RLE encoded booleans correctly."""
-    self.client.execute(("""CREATE TABLE {0}.rle_encoded_bool (b boolean, i 
int)
-        STORED AS PARQUET""").format(unique_database))
-    table_loc = get_fs_path(
-        "/test-warehouse/{0}.db/{1}".format(unique_database, 
"rle_encoded_bool"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/rle_encoded_bool.parquet", table_loc])
-
+    create_table_from_parquet(self.client, unique_database, "rle_encoded_bool")
     self.run_test_case(
         'QueryTest/parquet-rle-encoded-bool', vector, unique_database)
 
@@ -741,13 +658,7 @@ class TestParquet(ImpalaTestSuite):
        dictionary index bit width is larger than the encoded byte's bit width.
     """
     TABLE_NAME = "dict_encoding_with_large_bit_width"
-    self.client.execute("CREATE TABLE {0}.{1} (i tinyint) STORED AS 
PARQUET".format(
-        unique_database, TABLE_NAME))
-    table_loc = get_fs_path(
-        "/test-warehouse/{0}.db/{1}".format(unique_database, TABLE_NAME))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/{0}.parquet".format(TABLE_NAME), table_loc])
-
+    create_table_from_parquet(self.client, unique_database, TABLE_NAME)
     result = self.execute_query(
         "select * from {0}.{1}".format(unique_database, TABLE_NAME))
     assert(len(result.data) == 33)
@@ -1002,12 +913,9 @@ class TestUncompressedText(ImpalaTestSuite):
 
   # IMPALA-5315: Test support for date/time in unpadded format
   def test_scan_lazy_timestamp(self, vector, unique_database):
-    self.client.execute(("""CREATE TABLE {0}.lazy_ts (ts TIMESTAMP)""").format
-          (unique_database))
-    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
-          "lazy_ts"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-          "/testdata/data/lazy_timestamp.csv", tbl_loc])
+    test_files = ["testdata/data/lazy_timestamp.csv"]
+    create_table_and_copy_files(self.client, """CREATE TABLE {db}.{tbl} (ts 
TIMESTAMP)""",
+                                unique_database, "lazy_ts", test_files)
     self.run_test_case('QueryTest/select-lazy-timestamp', vector, 
unique_database)
 
 class TestOrc(ImpalaTestSuite):

Reply via email to