This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 166b39547 IMPALA-14553: Run schema eval concurrently
166b39547 is described below
commit 166b39547e033956e3f5c941cb36165c59a18275
Author: Michael Smith <[email protected]>
AuthorDate: Mon Nov 3 16:46:05 2025 -0800
IMPALA-14553: Run schema eval concurrently
The majority of time spent in generate-schema-statements.py is in
eval_section for schema operations that shell out, often uploading files
via the hadoop CLI or generating data files. These operations should be
independent.
Runs eval_section at the beginning so we don't repeat it for each row in
test_vectors, and executes them in parallel via a ThreadPool. Defaults
to NUM_CONCURRENT_TESTS threads because the underlying operations have
some concurrency to them (such as HDFS mirroring writes).
Also collects existing tables into a set to optimize lookup.
Reduces generate-schema-statements by ~60%, from 2m30s to 1m. Confirmed
that contents of logs/data_loading/sql/functional are identical.
Change-Id: I2a78d05fd6a0005c83561978713237da2dde6af2
Reviewed-on: http://gerrit.cloudera.org:8080/23627
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Michael Smith <[email protected]>
---
testdata/bin/generate-schema-statements.py | 185 +++++++++++++++++++++--------
1 file changed, 136 insertions(+), 49 deletions(-)
diff --git a/testdata/bin/generate-schema-statements.py
b/testdata/bin/generate-schema-statements.py
index 229c49891..84e550d01 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -104,6 +104,8 @@ import shutil
import subprocess
import sys
import tempfile
+import time
+from multiprocessing.pool import AsyncResult, ThreadPool
from optparse import OptionParser
from tests.common.environ import HIVE_MAJOR_VERSION
from tests.util.test_file_parser import parse_table_constraints,
parse_test_file
@@ -136,6 +138,9 @@ parser.add_option("--table_formats", dest="table_formats",
default=None,
"formats. Ex. --table_formats=seq/snap/block,text/none")
parser.add_option("--hdfs_namenode", dest="hdfs_namenode",
default="localhost:20500",
help="HDFS name node for Avro schema URLs, default
localhost:20500")
+parser.add_option("--num_processes", type="int", dest="num_processes",
+ default=os.environ['NUM_CONCURRENT_TESTS'],
+ help="Number of parallel processes to use.")
(options, args) = parser.parse_args()
if options.workload is None:
@@ -699,13 +704,13 @@ def build_hbase_create_stmt(db_name, table_name,
column_families, region_splits)
def get_hdfs_subdirs_with_data(path):
tmp_file = tempfile.TemporaryFile("w+")
cmd = "hadoop fs -du %s | grep -v '^0' | awk '{print $3}'" % path
- subprocess.call([cmd], shell=True, stderr=open('/dev/null'), stdout=tmp_file)
+ subprocess.check_call([cmd], shell=True, stderr=open('/dev/null'),
stdout=tmp_file)
tmp_file.seek(0)
# Results look like:
# <acls> - <user> <group> <date> /directory/subdirectory
# So to get subdirectory names just return everything after the last '/'
- return [line[line.rfind('/') + 1:].strip() for line in tmp_file.readlines()]
+ return set([line[line.rfind('/') + 1:].strip() for line in
tmp_file.readlines()])
class Statements(object):
@@ -727,20 +732,108 @@ class Statements(object):
return bool(self.create or self.load or self.load_base)
-def eval_section(section_str):
+def eval_section(pool, section_str):
"""section_str should be the contents of a section (i.e. a string). If
section_str
- starts with `, evaluates section_str as a shell command and returns the
- output. Otherwise returns section_str."""
+ starts with `, evaluates section_str as a shell command in pool and returns
an
+ AsyncResult to produce the output. Otherwise returns section_str. Results of
this
+ function should be passed to unwrap() to get the actual value."""
if not section_str.startswith('`'): return section_str
cmd = section_str[1:]
# Use bash explicitly instead of setting shell=True so we get more advanced
shell
# features (e.g. "for i in {1..n}")
- p = subprocess.Popen(['/bin/bash', '-c', cmd], stdout=subprocess.PIPE,
- universal_newlines=True)
- stdout, stderr = p.communicate()
- if stderr: print(stderr)
- assert p.returncode == 0
- return stdout.strip()
+ return pool.apply_async(subprocess.check_output,
+ (['/bin/bash', '-c', cmd],), {'universal_newlines': True})
+
+
+def unwrap(result):
+ """If result is an AsyncResult, get the actual value from it. Otherwise
return
+ result."""
+ if type(result) is AsyncResult:
+ # If the command produced no output, return a newline so this section is
still
+ # treated as having been set.
+ return result.get() or '\n'
+ return result
+
+
+def eval_sections(test_vectors, sections, fails_only_constraint,
+ fails_include_constraint, fails_exclude_constraint):
+ """Evaluates all sections that are shell commands in parallel using a thread
pool.
+ Returns a new list of sections with all shell commands evaluated."""
+ new_sections = list()
+ table_names = None
+ if options.table_names:
+ table_names = [name.lower() for name in options.table_names.split(',')]
+ file_formats = [row.file_format for row in test_vectors]
+ table_formats =
[f"{row.file_format}/{row.compression_codec}/{row.compression_type}"
+ for row in test_vectors]
+
+ # Sections are re-used for multiple test vectors, but eval_section is only
needed once.
+ # Use a threadpool to execute eval_section in parallel as they shell out.
+ pool = ThreadPool(processes=options.num_processes)
+ for section in sections:
+ table_name = section['BASE_TABLE_NAME'].strip().lower()
+
+ if table_names and (table_name.lower() not in table_names):
+ print(f"Skipping table '{table_name}': table is not in specified table
list")
+ continue
+
+ # Check Hive version requirement, if present.
+ if section['HIVE_MAJOR_VERSION'] and \
+ section['HIVE_MAJOR_VERSION'].strip() != \
+ os.environ['IMPALA_HIVE_MAJOR_VERSION'].strip():
+ print(f"Skipping table '{table_name}': wrong Hive major version")
+ continue
+
+ if all([fails_only_constraint(table_name, format) for format in
table_formats]):
+ print(f"Skipping table '{table_name}': 'only' constraint for formats did
not "
+ "include this table.")
+ continue
+
+ if all([fails_include_constraint(table_name, format) for format in
table_formats]):
+ print(f"Skipping '{table_name}' due to include constraint matches.")
+ continue
+
+ if all([fails_exclude_constraint(table_name, format) for format in
table_formats]):
+ print(f"Skipping '{table_name}' due to exclude constraint matches.")
+ continue
+
+ assert not (section['CREATE'] and section['CREATE_HIVE']), \
+ "Can't set both CREATE and CREATE_HIVE"
+
+ assert not (section['DEPENDENT_LOAD'] and section['DEPENDENT_LOAD_HIVE']),
\
+ "Can't set both DEPENDENT_LOAD and DEPENDENT_LOAD_HIVE"
+
+ section['LOAD'] = eval_section(pool, section['LOAD'])
+
+ section['DEPENDENT_LOAD'] = eval_section(pool, section['DEPENDENT_LOAD'])
+ section['DEPENDENT_LOAD_HIVE'] = eval_section(pool,
section['DEPENDENT_LOAD_HIVE'])
+
+ if 'kudu' in file_formats and section['DEPENDENT_LOAD_KUDU']:
+ section['DEPENDENT_LOAD_KUDU'] = eval_section(pool,
section['DEPENDENT_LOAD_KUDU'])
+
+ if 'orc' in file_formats and section["DEPENDENT_LOAD_ACID"]:
+ section["DEPENDENT_LOAD_ACID"] = eval_section(pool,
section["DEPENDENT_LOAD_ACID"])
+
+ if 'json' in file_formats and section["DEPENDENT_LOAD_JSON"]:
+ section["DEPENDENT_LOAD_JSON"] = eval_section(pool,
section["DEPENDENT_LOAD_JSON"])
+
+ section['COLUMNS'] = eval_section(pool, section['COLUMNS'])
+ new_sections.append(section)
+
+ # Close the pool to new tasks and collect the results.
+ pool.close()
+ pool.join()
+ for section in new_sections:
+ # Ensure all async commands are done.
+ section['LOAD'] = unwrap(section['LOAD'])
+ section['DEPENDENT_LOAD'] = unwrap(section['DEPENDENT_LOAD'])
+ section['DEPENDENT_LOAD_HIVE'] = unwrap(section['DEPENDENT_LOAD_HIVE'])
+ section['DEPENDENT_LOAD_KUDU'] = unwrap(section['DEPENDENT_LOAD_KUDU'])
+ section["DEPENDENT_LOAD_ACID"] = unwrap(section["DEPENDENT_LOAD_ACID"])
+ section["DEPENDENT_LOAD_JSON"] = unwrap(section["DEPENDENT_LOAD_JSON"])
+ section['COLUMNS'] = unwrap(section['COLUMNS'])
+
+ return new_sections
def generate_statements(output_name, test_vectors, sections,
@@ -748,14 +841,27 @@ def generate_statements(output_name, test_vectors,
sections,
schema_only_constraints, convert_orc_to_full_acid):
# TODO: This method has become very unwieldy. It has to be re-factored
sooner than
# later.
- # Parquet statements to be executed separately by Impala
+ def fails_only_constraint(table_name, table_format):
+ constraint = schema_only_constraints.get(table_format)
+ return constraint is not None and table_name not in constraint
+
+ def fails_include_constraint(table_name, table_format):
+ constraint = schema_include_constraints.get(table_name)
+ return constraint is not None and table_format not in constraint
+
+ def fails_exclude_constraint(table_name, table_format):
+ constraint = schema_exclude_constraints.get(table_name)
+ return constraint is not None and table_format in constraint
+
+ start = time.time()
+ sections = eval_sections(test_vectors, sections, fails_only_constraint,
+ fails_include_constraint, fails_exclude_constraint)
+ print(f"Evaluating sections took {time.time() - start:.3f} seconds")
+
hbase_output = Statements()
hbase_post_load = Statements()
impala_invalidate = Statements()
- table_names = None
- if options.table_names:
- table_names = [name.lower() for name in options.table_names.split(',')]
existing_tables = get_hdfs_subdirs_with_data(options.hive_warehouse_dir)
for row in test_vectors:
impala_create = Statements()
@@ -772,62 +878,43 @@ def generate_statements(output_name, test_vectors,
sections,
for section in sections:
table_name = section['BASE_TABLE_NAME'].strip()
- if table_names and (table_name.lower() not in table_names):
- print('Skipping table: %s.%s, table is not in specified table list' %
- (db, table_name))
+ if fails_only_constraint(table_name.lower(), table_format):
+ print(f"Skipping table: {db}.{table_name}, 'only' constraint for
format did not "
+ "include this table.")
continue
- # Check Hive version requirement, if present.
- if section['HIVE_MAJOR_VERSION'] and \
- section['HIVE_MAJOR_VERSION'].strip() != \
- os.environ['IMPALA_HIVE_MAJOR_VERSION'].strip():
- print("Skipping table '{0}.{1}': wrong Hive major version".format(db,
table_name))
+ if fails_include_constraint(table_name.lower(), table_format):
+ print(f"Skipping '{db}.{table_name}' due to include constraint match.")
continue
- if table_format in schema_only_constraints and \
- table_name.lower() not in schema_only_constraints[table_format]:
- print(('Skipping table: %s.%s, \'only\' constraint for format did not '
- 'include this table.') % (db, table_name))
+ if fails_exclude_constraint(table_name.lower(), table_format):
+ print(f"Skipping '{db}.{table_name}' due to exclude constraint match.")
continue
- if schema_include_constraints[table_name.lower()] and \
- table_format not in schema_include_constraints[table_name.lower()]:
- print('Skipping \'%s.%s\' due to include constraint match.' % (db,
table_name))
- continue
-
- if schema_exclude_constraints[table_name.lower()] and\
- table_format in schema_exclude_constraints[table_name.lower()]:
- print('Skipping \'%s.%s\' due to exclude constraint match.' % (db,
table_name))
- continue
-
- alter = section.get('ALTER')
+ alter = section['ALTER']
create = section['CREATE']
create_hive = section['CREATE_HIVE']
- assert not (create and create_hive), "Can't set both CREATE and
CREATE_HIVE"
-
table_properties = section['TABLE_PROPERTIES']
- insert = eval_section(section['DEPENDENT_LOAD'])
- insert_hive = eval_section(section['DEPENDENT_LOAD_HIVE'])
- assert not (insert and insert_hive),\
- "Can't set both DEPENDENT_LOAD and DEPENDENT_LOAD_HIVE"
- load = eval_section(section['LOAD'])
+ insert = section['DEPENDENT_LOAD']
+ insert_hive = section['DEPENDENT_LOAD_HIVE']
+ load = section['LOAD']
if file_format == 'kudu':
create_kudu = section["CREATE_KUDU"]
if section['DEPENDENT_LOAD_KUDU']:
- insert = eval_section(section['DEPENDENT_LOAD_KUDU'])
+ insert = section['DEPENDENT_LOAD_KUDU']
else:
create_kudu = None
if file_format == 'orc' and section["DEPENDENT_LOAD_ACID"]:
insert = None
- insert_hive = eval_section(section["DEPENDENT_LOAD_ACID"])
+ insert_hive = section["DEPENDENT_LOAD_ACID"]
if file_format == 'json' and section["DEPENDENT_LOAD_JSON"]:
insert = None
- insert_hive = eval_section(section["DEPENDENT_LOAD_JSON"])
+ insert_hive = section["DEPENDENT_LOAD_JSON"]
- columns = eval_section(section['COLUMNS']).strip()
+ columns = section['COLUMNS'].strip()
partition_columns = section['PARTITION_COLUMNS'].strip()
row_format = section['ROW_FORMAT'].strip()
table_comment = section['COMMENT'].strip()