IMPALA-6372: Go parallel for Hive dataload

This changes generate-schema-statements.py to produce
separate SQL files for different file formats for Hive.
This changes load-data.py to go parallel on these
separate Hive SQL files. For correctness, the text
version of all tables must be loaded before any
of the other file formats.

load-data.py runs DDLs to create the tables in Impala
and goes parallel. Currently, there are some minor
dependencies so that text tables must be created
prior to creating the other table formats. This
changes the definitions of some tables in
testdata/datasets/functional/functional_schema_template.sql
to remove these dependencies. Now, the DDLs for the
text tables can run in parallel to the other file formats.

To unify the parallelism for Impala and Hive, load-data.py
now uses a single fixed-size pool of processes to run all
SQL files rather than spawning a thread per SQL file.

This also modifies the locations that do invalidate to
use refresh where possible and eliminate global
invalidates.

For debuggability, different SQL executions output to
different log files rather than to standard out. If an
error occurs, this will point out the relevant log
file.

This saves about 10-15 minutes on dataload (including
for GVO).

Change-Id: I34b71e6df3c8f23a5a31451280e35f4dc015a2fd
Reviewed-on: http://gerrit.cloudera.org:8080/8894
Reviewed-by: Joe McDonnell <joemcdonn...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d481cd48
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d481cd48
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d481cd48

Branch: refs/heads/master
Commit: d481cd4842e8d92dd77cdd7f70720ff0b696dfbb
Parents: bc6c3c7
Author: Joe McDonnell <joemcdonn...@cloudera.com>
Authored: Wed Dec 20 10:29:10 2017 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Sat Apr 14 00:16:26 2018 +0000

----------------------------------------------------------------------
 bin/load-data.py                                | 395 +++++++++++++------
 testdata/bin/generate-schema-statements.py      | 140 +++++--
 testdata/bin/load_nested.py                     |  33 +-
 .../functional/functional_schema_template.sql   |  10 +-
 4 files changed, 398 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d481cd48/bin/load-data.py
----------------------------------------------------------------------
diff --git a/bin/load-data.py b/bin/load-data.py
index ed51487..28a504f 100755
--- a/bin/load-data.py
+++ b/bin/load-data.py
@@ -23,8 +23,10 @@
 import collections
 import getpass
 import logging
+import multiprocessing
 import os
 import re
+import shutil
 import sqlparse
 import subprocess
 import sys
@@ -32,15 +34,11 @@ import tempfile
 import time
 import traceback
 
-from itertools import product
 from optparse import OptionParser
-from Queue import Queue
 from tests.beeswax.impala_beeswax import *
-from threading import Thread
+from multiprocessing.pool import ThreadPool
 
-logging.basicConfig()
 LOG = logging.getLogger('load-data.py')
-LOG.setLevel(logging.DEBUG)
 
 parser = OptionParser()
 parser.add_option("-e", "--exploration_strategy", dest="exploration_strategy",
@@ -80,6 +78,8 @@ parser.add_option("--use_kerberos", action="store_true", 
default=False,
                   help="Load data on a kerberized cluster.")
 parser.add_option("--principal", default=None, dest="principal",
                   help="Kerberos service principal, required if --use_kerberos 
is set")
+parser.add_option("--num_processes", default=multiprocessing.cpu_count(),
+                  dest="num_processes", help="Number of parallel processes to 
use.")
 
 options, args = parser.parse_args()
 
@@ -111,21 +111,6 @@ if options.use_kerberos:
 HIVE_ARGS = '-n %s -u "jdbc:hive2://%s/default;%s" --verbose=true'\
     % (getpass.getuser(), options.hive_hs2_hostport, hive_auth)
 
-# When HiveServer2 is configured to use "local" mode (i.e., MR jobs are run
-# in-process rather than on YARN), Hadoop's LocalDistributedCacheManager has a
-# race, wherein it tires to localize jars into
-# /tmp/hadoop-$USER/mapred/local/<millis>. Two simultaneous Hive queries
-# against HS2 can conflict here. Weirdly LocalJobRunner handles a similar issue
-# (with the staging directory) by appending a random number. To over come this,
-# in the case that HS2 is on the local machine (which we conflate with also
-# running MR jobs locally), we move the temporary directory into a unique
-# directory via configuration. This block can be removed when
-# https://issues.apache.org/jira/browse/MAPREDUCE-6441 is resolved.
-# A similar workaround is used in tests/common/impala_test_suite.py.
-if options.hive_hs2_hostport.startswith("localhost:"):
-  HIVE_ARGS += ' --hiveconf "mapreduce.cluster.local.dir=%s"' % 
(tempfile.mkdtemp(
-    prefix="impala-data-load-"))
-
 HADOOP_CMD = os.path.join(os.environ['HADOOP_HOME'], 'bin/hadoop')
 
 def available_workloads(workload_dir):
@@ -135,70 +120,112 @@ def available_workloads(workload_dir):
 def validate_workloads(all_workloads, workloads):
   for workload in workloads:
     if workload not in all_workloads:
-      print 'Workload \'%s\' not found in workload directory' % workload
-      print 'Available workloads: ' + ', '.join(all_workloads)
+      LOG.error('Workload \'%s\' not found in workload directory' % workload)
+      LOG.error('Available workloads: ' + ', '.join(all_workloads))
       sys.exit(1)
 
-def exec_cmd(cmd, error_msg, exit_on_error=True):
-  ret_val = -1
-  try:
+def exec_cmd(cmd, error_msg=None, exit_on_error=True, out_file=None):
+  """Run the given command in the shell returning whether the command
+     succeeded. If 'error_msg' is set, log the error message on failure.
+     If 'exit_on_error' is True, exit the program on failure.
+     If 'out_file' is specified, log all output to that file."""
+  success = True
+  if out_file:
+    with open(out_file, 'w') as f:
+      ret_val = subprocess.call(cmd, shell=True, stderr=f, stdout=f)
+  else:
     ret_val = subprocess.call(cmd, shell=True)
-  except Exception as e:
-    error_msg = "%s: %s" % (error_msg, str(e))
-  finally:
-    if ret_val != 0:
-      print error_msg
-      if exit_on_error: sys.exit(ret_val)
-  return ret_val
-
-def exec_hive_query_from_file(file_name):
-  if not os.path.exists(file_name): return
-  hive_cmd = "%s %s -f %s" % (HIVE_CMD, HIVE_ARGS, file_name)
-  print 'Executing Hive Command: %s' % hive_cmd
-  exec_cmd(hive_cmd,  'Error executing file from Hive: ' + file_name)
+  if ret_val != 0:
+    if error_msg: LOG.info(error_msg)
+    if exit_on_error: sys.exit(ret_val)
+    success = False
+  return success
+
+def exec_hive_query_from_file_beeline(file_name):
+  if not os.path.exists(file_name):
+    LOG.info("Error: File {0} not found".format(file_name))
+    return False
+
+  LOG.info("Beginning execution of hive SQL: {0}".format(file_name))
+
+  # When HiveServer2 is configured to use "local" mode (i.e., MR jobs are run
+  # in-process rather than on YARN), Hadoop's LocalDistributedCacheManager has 
a
+  # race, wherein it tires to localize jars into
+  # /tmp/hadoop-$USER/mapred/local/<millis>. Two simultaneous Hive queries
+  # against HS2 can conflict here. Weirdly LocalJobRunner handles a similar 
issue
+  # (with the staging directory) by appending a random number. To over come 
this,
+  # in the case that HS2 is on the local machine (which we conflate with also
+  # running MR jobs locally), we move the temporary directory into a unique
+  # directory via configuration. This block can be removed when
+  # https://issues.apache.org/jira/browse/MAPREDUCE-6441 is resolved.
+  hive_args = HIVE_ARGS
+  unique_dir = None
+  if options.hive_hs2_hostport.startswith("localhost:"):
+    unique_dir = tempfile.mkdtemp(prefix="hive-data-load-")
+    hive_args += ' --hiveconf "mapreduce.cluster.local.dir=%s"' % unique_dir
+
+  output_file = file_name + ".log"
+  hive_cmd = "{0} {1} -f {2}".format(HIVE_CMD, hive_args, file_name)
+  is_success = exec_cmd(hive_cmd, exit_on_error=False, out_file=output_file)
+  shutil.rmtree(unique_dir)
+
+  if is_success:
+    LOG.info("Finished execution of hive SQL: {0}".format(file_name))
+  else:
+    LOG.info("Error executing hive SQL: {0} See: {1}".format(file_name, \
+             output_file))
+
+  return is_success
 
 def exec_hbase_query_from_file(file_name):
   if not os.path.exists(file_name): return
   hbase_cmd = "hbase shell %s" % file_name
-  print 'Executing HBase Command: %s' % hbase_cmd
-  exec_cmd(hbase_cmd, 'Error executing hbase create commands')
+  LOG.info('Executing HBase Command: %s' % hbase_cmd)
+  exec_cmd(hbase_cmd, error_msg='Error executing hbase create commands')
 
 # KERBEROS TODO: fails when kerberized and impalad principal isn't "impala"
 def exec_impala_query_from_file(file_name):
   """Execute each query in an Impala query file individually"""
+  if not os.path.exists(file_name):
+    LOG.info("Error: File {0} not found".format(file_name))
+    return False
+
+  LOG.info("Beginning execution of impala SQL: {0}".format(file_name))
   is_success = True
   impala_client = ImpalaBeeswaxClient(options.impalad, 
use_kerberos=options.use_kerberos)
-  try:
-    impala_client.connect()
-    with open(file_name, 'r+') as query_file:
-      queries = sqlparse.split(query_file.read())
-    for query in queries:
-      query = sqlparse.format(query.rstrip(';'), strip_comments=True)
-      print '(%s):\n%s\n' % (file_name, query.strip())
-      if query.strip() != "":
-        result = impala_client.execute(query)
-  except Exception as e:
-    print "Data Loading from Impala failed with error: %s" % str(e)
-    traceback.print_exc()
-    is_success = False
-  finally:
-    impala_client.close_connection()
-  return is_success
+  output_file = file_name + ".log"
+  with open(output_file, 'w') as out_file:
+    try:
+      impala_client.connect()
+      with open(file_name, 'r+') as query_file:
+        queries = sqlparse.split(query_file.read())
+        for query in queries:
+          query = sqlparse.format(query.rstrip(';'), strip_comments=True)
+          if query.strip() != "":
+            result = impala_client.execute(query)
+            out_file.write("{0}\n{1}\n".format(query, result))
+    except Exception as e:
+      out_file.write("ERROR: {0}\n".format(query))
+      traceback.print_exc(file=out_file)
+      is_success = False
 
-def exec_bash_script(file_name):
-  bash_cmd = "bash %s" % file_name
-  print 'Executing Bash Command: ' + bash_cmd
-  exec_cmd(bash_cmd, 'Error bash script: ' + file_name)
+  if is_success:
+    LOG.info("Finished execution of impala SQL: {0}".format(file_name))
+  else:
+    LOG.info("Error executing impala SQL: {0} See: {1}".format(file_name, \
+             output_file))
+
+  return is_success
 
 def run_dataset_preload(dataset):
   """Execute a preload script if present in dataset directory. E.g. to 
generate data
   before loading"""
   dataset_preload_script = os.path.join(DATASET_DIR, dataset, "preload")
   if os.path.exists(dataset_preload_script):
-    print("Running preload script for " + dataset)
+    LOG.info("Running preload script for " + dataset)
     if options.scale_factor > 1:
       dataset_preload_script += " " + str(options.scale_factor)
-    exec_cmd(dataset_preload_script, "Error executing preload script for " + 
dataset,
+    exec_cmd(dataset_preload_script, error_msg="Error executing preload script 
for " + dataset,
         exit_on_error=True)
 
 def generate_schema_statements(workload):
@@ -215,29 +242,29 @@ def generate_schema_statements(workload):
   if options.hdfs_namenode is not None:
     generate_cmd += " --hdfs_namenode=%s" % options.hdfs_namenode
   generate_cmd += " --backend=%s" % options.impalad
-  print 'Executing Generate Schema Command: ' + generate_cmd
+  LOG.info('Executing Generate Schema Command: ' + generate_cmd)
   schema_cmd = os.path.join(TESTDATA_BIN_DIR, generate_cmd)
   error_msg = 'Error generating schema statements for workload: ' + workload
-  exec_cmd(schema_cmd, error_msg)
+  exec_cmd(schema_cmd, error_msg=error_msg)
 
 def get_dataset_for_workload(workload):
   dimension_file_name = os.path.join(WORKLOAD_DIR, workload,
                                      '%s_dimensions.csv' % workload)
   if not os.path.isfile(dimension_file_name):
-    print 'Dimension file not found: ' + dimension_file_name
+    LOG.error('Dimension file not found: ' + dimension_file_name)
     sys.exit(1)
   with open(dimension_file_name, 'rb') as input_file:
     match = re.search('dataset:\s*([\w\-\.]+)', input_file.read())
     if match:
       return match.group(1)
     else:
-      print 'Dimension file does not contain dataset for workload \'%s\'' % 
(workload)
+      LOG.error('Dimension file does not contain dataset for workload \'%s\'' 
% (workload))
       sys.exit(1)
 
 def copy_avro_schemas_to_hdfs(schemas_dir):
   """Recursively copies all of schemas_dir to the test warehouse."""
   if not os.path.exists(schemas_dir):
-    print 'Avro schema dir (%s) does not exist. Skipping copy to HDFS.' % 
schemas_dir
+    LOG.info('Avro schema dir (%s) does not exist. Skipping copy to HDFS.' % 
schemas_dir)
     return
 
   exec_hadoop_fs_cmd("-mkdir -p " + options.hive_warehouse_dir)
@@ -245,41 +272,36 @@ def copy_avro_schemas_to_hdfs(schemas_dir):
 
 def exec_hadoop_fs_cmd(args, exit_on_error=True):
   cmd = "%s fs %s" % (HADOOP_CMD, args)
-  print "Executing Hadoop command: " + cmd
-  exec_cmd(cmd, "Error executing Hadoop command, exiting",
+  LOG.info("Executing Hadoop command: " + cmd)
+  exec_cmd(cmd, error_msg="Error executing Hadoop command, exiting",
       exit_on_error=exit_on_error)
 
-def exec_impala_query_from_file_parallel(query_files):
-  # Get the name of the query file that loads the base tables, if it exists.
-  # TODO: Find a better way to detect the file that loads the base tables.
-  create_base_table_file = next((q for q in query_files if 'text' in q), None)
-  if create_base_table_file:
-    is_success = exec_impala_query_from_file(create_base_table_file)
-    query_files.remove(create_base_table_file)
-    # If loading the base tables failed, exit with a non zero error code.
-    if not is_success: sys.exit(1)
-  if not query_files: return
-  threads = []
-  result_queue = Queue()
-  for query_file in query_files:
-    thread = Thread(target=lambda x: 
result_queue.put(exec_impala_query_from_file(x)),
-        args=[query_file])
-    thread.daemon = True
-    threads.append(thread)
-    thread.start()
-  # Keep looping until the number of results retrieved is the same as the 
number of
-  # threads spawned, or until a data loading query fails. result_queue.get() 
will
-  # block until a result is available in the queue.
-  num_fetched_results = 0
-  while num_fetched_results < len(threads):
-    success = result_queue.get()
-    num_fetched_results += 1
-    if not success: sys.exit(1)
-  # There is a small window where a thread may still be alive even if all the 
threads have
-  # finished putting their results in the queue.
-  for thread in threads: thread.join()
-
-if __name__ == "__main__":
+def exec_query_files_parallel(thread_pool, query_files, execution_type):
+  """Executes the query files provided using the execution engine specified
+     in parallel using the given thread pool. Aborts immediately if any 
execution
+     encounters an error."""
+  assert(execution_type == 'impala' or execution_type == 'hive')
+  if len(query_files) == 0: return
+  if execution_type == 'impala':
+    execution_function = exec_impala_query_from_file
+  elif execution_type == 'hive':
+    execution_function = exec_hive_query_from_file_beeline
+
+  for result in thread_pool.imap_unordered(execution_function, query_files):
+    if not result:
+      thread_pool.terminate()
+      sys.exit(1)
+
+def impala_exec_query_files_parallel(thread_pool, query_files):
+  exec_query_files_parallel(thread_pool, query_files, 'impala')
+
+def hive_exec_query_files_parallel(thread_pool, query_files):
+  exec_query_files_parallel(thread_pool, query_files, 'hive')
+
+def main():
+  logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%H:%M:%S')
+  LOG.setLevel(logging.DEBUG)
+
   # Having the actual command line at the top of each data-load-* log can help
   # when debugging dataload issues.
   #
@@ -288,62 +310,185 @@ if __name__ == "__main__":
   all_workloads = available_workloads(WORKLOAD_DIR)
   workloads = []
   if options.workloads is None:
-    print "At least one workload name must be specified."
+    LOG.error("At least one workload name must be specified.")
     parser.print_help()
     sys.exit(1)
   elif options.workloads == 'all':
-    print 'Loading data for all workloads.'
+    LOG.info('Loading data for all workloads.')
     workloads = all_workloads
   else:
     workloads = options.workloads.split(",")
     validate_workloads(all_workloads, workloads)
 
-  print 'Starting data load for the following workloads: ' + ', 
'.join(workloads)
+  LOG.info('Starting data load for the following workloads: ' + ', 
'.join(workloads))
+  LOG.info('Running with {0} threads'.format(options.num_processes))
 
+  # Note: The processes are in whatever the caller's directory is, so all paths
+  #       passed to the pool need to be absolute paths. This will allow the 
pool
+  #       to be used for different workloads (and thus different directories)
+  #       simultaneously.
+  thread_pool = ThreadPool(processes=options.num_processes)
   loading_time_map = collections.defaultdict(float)
   for workload in workloads:
     start_time = time.time()
     dataset = get_dataset_for_workload(workload)
     run_dataset_preload(dataset)
+    # This script is tightly coupled with 
testdata/bin/generate-schema-statements.py
+    # Specifically, this script is expecting the following:
+    # 1. generate-schema-statements.py generates files and puts them in the
+    #    directory ${IMPALA_DATA_LOADING_SQL_DIR}/${workload}
+    #    (e.g. ${IMPALA_HOME}/logs/data_loading/sql/tpch)
+    # 2. generate-schema-statements.py populates the subdirectory
+    #    avro_schemas/${workload} with JSON files specifying the Avro schema 
for the
+    #    tables being loaded.
+    # 3. generate-schema-statements.py uses a particular naming scheme to 
distinguish
+    #    between SQL files of different load phases.
+    #
+    #    Using the following variables:
+    #    workload_exploration = ${workload}-${exploration_strategy} and
+    #    file_format_suffix = ${file_format}-${codec}-${compression_type}
+    #
+    #    A. Impala table creation scripts run in Impala to create tables, 
partitions,
+    #       and views. There is one for each file format. They take the form:
+    #       
create-${workload_exploration}-impala-generated-${file_format_suffix}.sql
+    #
+    #    B. Hive creation/load scripts run in Hive to load data into tables 
and create
+    #       tables or views that Impala does not support. There is one for each
+    #       file format. They take the form:
+    #       
load-${workload_exploration}-hive-generated-${file_format_suffix}.sql
+    #
+    #    C. HBase creation script runs through the hbase commandline to create
+    #       HBase tables. (Only generated if loading HBase table.) It takes 
the form:
+    #       load-${workload_exploration}-hbase-generated.create
+    #
+    #    D. HBase postload script runs through the hbase commandline to flush
+    #       HBase tables. (Only generated if loading HBase table.) It takes 
the form:
+    #       post-load-${workload_exploration}-hbase-generated.sql
+    #
+    #    E. Impala load scripts run in Impala to load data. Only Parquet and 
Kudu
+    #       are loaded through Impala. There is one for each of those formats 
loaded.
+    #       They take the form:
+    #       
load-${workload_exploration}-impala-generated-${file_format_suffix}.sql
+    #
+    #    F. Invalidation script runs through Impala to invalidate/refresh 
metadata
+    #       for tables. It takes the form:
+    #       invalidate-${workload_exploration}-impala-generated.sql
     generate_schema_statements(workload)
+
+    # Determine the directory from #1
     sql_dir = os.path.join(SQL_OUTPUT_DIR, dataset)
     assert os.path.isdir(sql_dir),\
       ("Could not find the generated SQL files for loading dataset '%s'.\
         \nExpected to find the SQL files in: %s" % (dataset, sql_dir))
-    os.chdir(os.path.join(SQL_OUTPUT_DIR, dataset))
-    copy_avro_schemas_to_hdfs(AVRO_SCHEMA_DIR)
-    dataset_dir_contents = os.listdir(os.getcwd())
-    load_file_substr = "%s-%s" % (workload, options.exploration_strategy)
-    # Data loading with Impala is done in parallel, each file format has a 
separate query
-    # file.
-    create_filename = 'create-%s-impala-generated' % load_file_substr
-    load_filename = 'load-%s-impala-generated' % load_file_substr
-    impala_create_files = [f for f in dataset_dir_contents if create_filename 
in f]
-    impala_load_files = [f for f in dataset_dir_contents if load_filename in f]
+
+    # Copy the avro schemas (see #2) into HDFS
+    avro_schemas_path = os.path.join(sql_dir, AVRO_SCHEMA_DIR)
+    copy_avro_schemas_to_hdfs(avro_schemas_path)
+
+    # List all of the files in the sql directory to sort out the various types 
of
+    # files (see #3).
+    dataset_dir_contents = [os.path.join(sql_dir, f) for f in 
os.listdir(sql_dir)]
+    workload_exploration = "%s-%s" % (workload, options.exploration_strategy)
+
+    # Remove the AVRO_SCHEMA_DIR from the list of files
+    if os.path.exists(avro_schemas_path):
+      dataset_dir_contents.remove(avro_schemas_path)
+
+    # Match for Impala create files (3.A)
+    impala_create_match = 'create-%s-impala-generated' % workload_exploration
+    # Match for Hive create/load files (3.B)
+    hive_load_match = 'load-%s-hive-generated' % workload_exploration
+    # Match for HBase creation script (3.C)
+    hbase_create_match = 'load-%s-hbase-generated.create' % 
workload_exploration
+    # Match for HBase post-load script (3.D)
+    hbase_postload_match = 'post-load-%s-hbase-generated.sql' % 
workload_exploration
+    # Match for Impala load scripts (3.E)
+    impala_load_match = 'load-%s-impala-generated' % workload_exploration
+    # Match for Impala invalidate script (3.F)
+    invalidate_match = 'invalidate-%s-impala-generated' % workload_exploration
+
+    impala_create_files = []
+    hive_load_text_files = []
+    hive_load_nontext_files = []
+    hbase_create_files = []
+    hbase_postload_files = []
+    impala_load_files = []
+    invalidate_files = []
+    for filename in dataset_dir_contents:
+      if impala_create_match in filename:
+        impala_create_files.append(filename)
+      elif hive_load_match in filename:
+        if 'text-none-none' in filename:
+          hive_load_text_files.append(filename)
+        else:
+          hive_load_nontext_files.append(filename)
+      elif hbase_create_match in filename:
+        hbase_create_files.append(filename)
+      elif hbase_postload_match in filename:
+        hbase_postload_files.append(filename)
+      elif impala_load_match in filename:
+        impala_load_files.append(filename)
+      elif invalidate_match in filename:
+        invalidate_files.append(filename)
+      else:
+        assert False, "Unexpected input file {0}".format(filename)
+
+    # Simple helper function to dump a header followed by the filenames
+    def log_file_list(header, file_list):
+      if (len(file_list) == 0): return
+      LOG.debug(header)
+      map(LOG.debug, map(os.path.basename, file_list))
+      LOG.debug("\n")
+
+    log_file_list("Impala Create Files:", impala_create_files)
+    log_file_list("Hive Load Text Files:", hive_load_text_files)
+    log_file_list("Hive Load Non-Text Files:", hive_load_nontext_files)
+    log_file_list("HBase Create Files:", hbase_create_files)
+    log_file_list("HBase Post-Load Files:", hbase_postload_files)
+    log_file_list("Impala Load Files:", impala_load_files)
+    log_file_list("Impala Invalidate Files:", invalidate_files)
 
     # Execute the data loading scripts.
     # Creating tables in Impala has no dependencies, so we execute them first.
     # HBase table inserts are done via hive, so the hbase tables need to be 
created before
-    # running the hive script. Some of the Impala inserts depend on hive 
tables,
+    # running the hive scripts. Some of the Impala inserts depend on hive 
tables,
     # so they're done at the end. Finally, the Hbase Tables that have been 
filled with data
     # need to be flushed.
-    exec_impala_query_from_file_parallel(impala_create_files)
-    exec_hbase_query_from_file('load-%s-hbase-generated.create' % 
load_file_substr)
-    exec_hive_query_from_file('load-%s-hive-generated.sql' % load_file_substr)
-    exec_hbase_query_from_file('post-load-%s-hbase-generated.sql' % 
load_file_substr)
+
+    impala_exec_query_files_parallel(thread_pool, impala_create_files)
+
+    # There should be at most one hbase creation script
+    assert(len(hbase_create_files) <= 1)
+    for hbase_create in hbase_create_files:
+      exec_hbase_query_from_file(hbase_create)
+
+    # If this is loading text tables plus multiple other formats, the text 
tables
+    # need to be loaded first
+    assert(len(hive_load_text_files) <= 1)
+    hive_exec_query_files_parallel(thread_pool, hive_load_text_files)
+    hive_exec_query_files_parallel(thread_pool, hive_load_nontext_files)
+
+    assert(len(hbase_postload_files) <= 1)
+    for hbase_postload in hbase_postload_files:
+      exec_hbase_query_from_file(hbase_postload)
 
     # Invalidate so that Impala sees the loads done by Hive before loading 
Parquet/Kudu
     # Note: This only invalidates tables for this workload.
-    invalidate_sql_file = 
'invalidate-{0}-impala-generated.sql'.format(load_file_substr)
-    if impala_load_files: exec_impala_query_from_file(invalidate_sql_file)
-    exec_impala_query_from_file_parallel(impala_load_files)
+    assert(len(invalidate_files) <= 1)
+    if impala_load_files:
+      impala_exec_query_files_parallel(thread_pool, invalidate_files)
+      impala_exec_query_files_parallel(thread_pool, impala_load_files)
     # Final invalidate for this workload
-    exec_impala_query_from_file(invalidate_sql_file)
+    impala_exec_query_files_parallel(thread_pool, invalidate_files)
     loading_time_map[workload] = time.time() - start_time
 
   total_time = 0.0
+  thread_pool.close()
+  thread_pool.join()
   for workload, load_time in loading_time_map.iteritems():
     total_time += load_time
-    print 'Data loading for workload \'%s\' completed in: %.2fs'\
-        % (workload, load_time)
-  print 'Total load time: %.2fs\n' % total_time
+    LOG.info('Data loading for workload \'%s\' completed in: %.2fs'\
+        % (workload, load_time))
+  LOG.info('Total load time: %.2fs\n' % total_time)
+
+if __name__ == "__main__": main()

http://git-wip-us.apache.org/repos/asf/impala/blob/d481cd48/testdata/bin/generate-schema-statements.py
----------------------------------------------------------------------
diff --git a/testdata/bin/generate-schema-statements.py 
b/testdata/bin/generate-schema-statements.py
index 3f730e6..e039c48 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -16,30 +16,84 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-# This script generates the "CREATE TABLE", "INSERT", and "LOAD" statements 
for loading
-# test data and writes them to create-*-generated.sql and
-# load-*-generated.sql. These files are then executed by hive or impala, 
depending
-# on their contents. Additionally, for hbase, the file is of the form
-# create-*hbase*-generated.create.
 #
-# The statements that are generated are based on an input test vector
-# (read from a file) that describes the coverage desired. For example, 
currently
-# we want to run benchmarks with different data sets, across different file 
types, and
-# with different compression algorithms set. To improve data loading 
performance this
-# script will generate an INSERT INTO statement to generate the data if the 
file does
-# not already exist in HDFS. If the file does already exist in HDFS then we 
simply issue a
-# LOAD statement which is much faster.
+# This script generates statements to create and populate
+# tables in a variety of formats. The tables and formats are
+# defined through a combination of files:
+# 1. Workload format specifics specify for each workload
+#    which formats are part of core, exhaustive, etc.
+#    This operates via the normal test dimensions.
+#    (see tests/common/test_dimension.py and
+#     testdata/workloads/*/*.csv)
+# 2. Workload table availability constraints specify which
+#    tables exist for which formats.
+#    (see testdata/datasets/*/schema_constraints.csv)
+# The arguments to this script specify the workload and
+# exploration strategy and can optionally restrict it
+# further to individual tables.
+#
+# This script is generating several SQL scripts to be
+# executed by bin/load-data.py. The two scripts are tightly
+# coupled and any change in files generated must be
+# reflected in bin/load-data.py. Currently, this script
+# generates three things:
+# 1. It creates the directory (destroying the existing
+#    directory if necessary)
+#    ${IMPALA_DATA_LOADING_SQL_DIR}/${workload}
+# 2. It creates and populates a subdirectory
+#    avro_schemas/${workload} with JSON files specifying
+#    the Avro schema for each table.
+# 3. It generates SQL files with the following naming schema:
+#
+#    Using the following variables:
+#    workload_exploration = ${workload}-${exploration_strategy} and
+#    file_format_suffix = ${file_format}-${codec}-${compression_type}
+#
+#    A. Impala table creation scripts run in Impala to create tables, 
partitions,
+#       and views. There is one for each file format. They take the form:
+#       
create-${workload_exploration}-impala-generated-${file_format_suffix}.sql
+#
+#    B. Hive creation/load scripts run in Hive to load data into tables and 
create
+#       tables or views that Impala does not support. There is one for each
+#       file format. They take the form:
+#       load-${workload_exploration}-hive-generated-${file_format_suffix}.sql
+#
+#    C. HBase creation script runs through the hbase commandline to create
+#       HBase tables. (Only generated if loading HBase table.) It takes the 
form:
+#       load-${workload_exploration}-hbase-generated.create
+#
+#    D. HBase postload script runs through the hbase commandline to flush
+#       HBase tables. (Only generated if loading HBase table.) It takes the 
form:
+#       post-load-${workload_exploration}-hbase-generated.sql
 #
-# The input test vectors are generated via the generate_test_vectors.py so
-# ensure that script has been run (or the test vector files already exist) 
before
-# running this script.
+#    E. Impala load scripts run in Impala to load data. Only Parquet and Kudu
+#       are loaded through Impala. There is one for each of those formats 
loaded.
+#       They take the form:
+#       load-${workload_exploration}-impala-generated-${file_format_suffix}.sql
+#
+#    F. Invalidation script runs through Impala to invalidate/refresh metadata
+#       for tables. It takes the form:
+#       invalidate-${workload_exploration}-impala-generated.sql
+#
+# In summary, table "CREATE" statements are mostly done by Impala. Any "CREATE"
+# statements that Impala does not support are done through Hive. Loading data
+# into tables mostly runs in Hive except for Parquet and Kudu tables.
+# Loading proceeds in two parts: First, data is loaded into text tables.
+# Second, almost all other formats are populated by inserts from the text
+# table. Since data loaded in Hive may not be visible in Impala, all tables
+# need to have metadata refreshed or invalidated before access in Impala.
+# This means that loading Parquet or Kudu requires invalidating source
+# tables. It also means that invalidate needs to happen at the end of dataload.
+#
+# For tables requiring customized actions to create schemas or place data,
+# this script allows the table specification to include commands that
+# this will execute as part of generating the SQL for table. If the command
+# generates output, that output is used for that section. This is useful
+# for custom tables that rely on loading specific files into HDFS or
+# for tables where specifying the schema is tedious (e.g. wide tables).
+# This should be used sparingly, because these commands are executed
+# serially.
 #
-# Note: This statement generation is assuming the following data loading 
workflow:
-# 1) Load all the data in the specified source table
-# 2) Create tables for the new file formats and compression types
-# 3) Run INSERT OVERWRITE TABLE SELECT * from the source table into the new 
tables
-#    or LOAD directly if the file already exists in HDFS.
 import collections
 import csv
 import glob
@@ -171,7 +225,7 @@ KNOWN_EXPLORATION_STRATEGIES = ['core', 'pairwise', 
'exhaustive', 'lzo']
 def build_create_statement(table_template, table_name, db_name, db_suffix,
                            file_format, compression, hdfs_location,
                            force_reload):
-  create_stmt = 'CREATE DATABASE IF NOT EXISTS %s%s;\n' % (db_name, db_suffix)
+  create_stmt = ''
   if (force_reload):
     create_stmt += 'DROP TABLE IF EXISTS %s%s.%s;\n' % (db_name, db_suffix, 
table_name)
   if compression == 'lzo':
@@ -453,13 +507,13 @@ class Statements(object):
 
   def write_to_file(self, filename):
     # If there is no content to write, skip
-    if self.__is_empty(): return
+    if not self: return
     output = self.create + self.load_base + self.load
     with open(filename, 'w') as f:
       f.write('\n\n'.join(output))
 
-  def __is_empty(self):
-    return not (self.create or self.load or self.load_base)
+  def __nonzero__(self):
+    return bool(self.create or self.load or self.load_base)
 
 def eval_section(section_str):
   """section_str should be the contents of a section (i.e. a string). If 
section_str
@@ -481,7 +535,6 @@ def generate_statements(output_name, test_vectors, sections,
   # TODO: This method has become very unwieldy. It has to be re-factored 
sooner than
   # later.
   # Parquet statements to be executed separately by Impala
-  hive_output = Statements()
   hbase_output = Statements()
   hbase_post_load = Statements()
   impala_invalidate = Statements()
@@ -492,16 +545,18 @@ def generate_statements(output_name, test_vectors, 
sections,
   existing_tables = get_hdfs_subdirs_with_data(options.hive_warehouse_dir)
   for row in test_vectors:
     impala_create = Statements()
+    hive_output = Statements()
     impala_load = Statements()
     file_format, data_set, codec, compression_type =\
         [row.file_format, row.dataset, row.compression_codec, 
row.compression_type]
     table_format = '%s/%s/%s' % (file_format, codec, compression_type)
+    db_suffix = row.db_suffix()
+    db_name = '{0}{1}'.format(data_set, options.scale_factor)
+    db = '{0}{1}'.format(db_name, db_suffix)
+    create_db_stmt = 'CREATE DATABASE IF NOT EXISTS {0};\n'.format(db)
+    impala_create.create.append(create_db_stmt)
     for section in sections:
       table_name = section['BASE_TABLE_NAME'].strip()
-      db_suffix = row.db_suffix()
-      db_name = '{0}{1}'.format(data_set, options.scale_factor)
-      db = '{0}{1}'.format(db_name, db_suffix)
-
 
       if table_names and (table_name.lower() not in table_names):
         print 'Skipping table: %s.%s, table is not in specified table list' % 
(db, table_name)
@@ -640,8 +695,13 @@ def generate_statements(output_name, test_vectors, 
sections,
             column_families))
         hbase_post_load.load.append("flush '%s_hbase.%s'\n" % (db_name, 
table_name))
 
-      # Need to emit an "invalidate metadata" for each individual table
-      invalidate_table_stmt = "INVALIDATE METADATA {0}.{1};\n".format(db, 
table_name)
+      # Need to make sure that tables created and/or data loaded in Hive is 
seen
+      # in Impala. We only need to do a full invalidate if the table was 
created in Hive
+      # and Impala doesn't know about it. Otherwise, do a refresh.
+      if output == hive_output:
+        invalidate_table_stmt = "INVALIDATE METADATA {0}.{1};\n".format(db, 
table_name)
+      else:
+        invalidate_table_stmt = "REFRESH {0}.{1};\n".format(db, table_name)
       impala_invalidate.create.append(invalidate_table_stmt)
 
       # The ALTER statement in hive does not accept fully qualified table 
names so
@@ -701,16 +761,18 @@ def generate_statements(output_name, test_vectors, 
sections,
 
     impala_create.write_to_file("create-%s-impala-generated-%s-%s-%s.sql" %
         (output_name, file_format, codec, compression_type))
+    hive_output.write_to_file("load-%s-hive-generated-%s-%s-%s.sql" %
+        (output_name, file_format, codec, compression_type))
     impala_load.write_to_file("load-%s-impala-generated-%s-%s-%s.sql" %
         (output_name, file_format, codec, compression_type))
 
-
-  hive_output.write_to_file('load-' + output_name + '-hive-generated.sql')
-  hbase_output.create.append("exit")
-  hbase_output.write_to_file('load-' + output_name + '-hbase-generated.create')
-  hbase_post_load.load.append("exit")
-  hbase_post_load.write_to_file('post-load-' + output_name + 
'-hbase-generated.sql')
-  impala_invalidate.write_to_file('invalidate-' + output_name + 
'-impala-generated.sql')
+  if hbase_output:
+    hbase_output.create.append("exit")
+    hbase_output.write_to_file('load-' + output_name + 
'-hbase-generated.create')
+  if hbase_post_load:
+    hbase_post_load.load.append("exit")
+    hbase_post_load.write_to_file('post-load-' + output_name + 
'-hbase-generated.sql')
+  impala_invalidate.write_to_file("invalidate-" + output_name + 
"-impala-generated.sql")
 
 def parse_schema_template_file(file_name):
   VALID_SECTION_NAMES = ['DATASET', 'BASE_TABLE_NAME', 'COLUMNS', 
'PARTITION_COLUMNS',

http://git-wip-us.apache.org/repos/asf/impala/blob/d481cd48/testdata/bin/load_nested.py
----------------------------------------------------------------------
diff --git a/testdata/bin/load_nested.py b/testdata/bin/load_nested.py
index 146c0ff..d391fdb 100755
--- a/testdata/bin/load_nested.py
+++ b/testdata/bin/load_nested.py
@@ -263,32 +263,43 @@ def load():
         TBLPROPERTIES('parquet.compression'='SNAPPY')
         AS SELECT * FROM tmp_customer;
 
-        DROP TABLE tmp_orders_string;
-        DROP TABLE tmp_customer_string;
-        DROP TABLE tmp_customer;
-
         CREATE TABLE region
         STORED AS PARQUET
         TBLPROPERTIES('parquet.compression'='SNAPPY')
         AS SELECT * FROM tmp_region;
 
-        DROP TABLE tmp_region_string;
-        DROP TABLE tmp_region;
-
         CREATE TABLE supplier
         STORED AS PARQUET
         TBLPROPERTIES('parquet.compression'='SNAPPY')
-        AS SELECT * FROM tmp_supplier;
+        AS SELECT * FROM tmp_supplier;""".split(";"):
+      if not stmt.strip():
+        continue
+      LOG.info("Executing: {0}".format(stmt))
+      hive.execute(stmt)
+
+  with cluster.impala.cursor(db_name=target_db) as impala:
+    # Drop the temporary tables. These temporary tables were created
+    # in Impala, so they exist in Impala's metadata. This drop is executed by
+    # Impala so that the metadata is automatically updated.
+    for stmt in """
+        DROP TABLE tmp_orders_string;
+        DROP TABLE tmp_customer_string;
+        DROP TABLE tmp_customer;
+
+        DROP TABLE tmp_region_string;
+        DROP TABLE tmp_region;
 
         DROP TABLE tmp_supplier;
         DROP TABLE tmp_supplier_string;""".split(";"):
       if not stmt.strip():
         continue
       LOG.info("Executing: {0}".format(stmt))
-      hive.execute(stmt)
+      impala.execute(stmt)
 
-  with cluster.impala.cursor(db_name=target_db) as impala:
-    impala.invalidate_metadata()
+    impala.invalidate_metadata(table_name="customer")
+    impala.invalidate_metadata(table_name="part")
+    impala.invalidate_metadata(table_name="region")
+    impala.invalidate_metadata(table_name="supplier")
     impala.compute_stats()
 
   LOG.info("Done loading nested TPCH data")

http://git-wip-us.apache.org/repos/asf/impala/blob/d481cd48/testdata/datasets/functional/functional_schema_template.sql
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/functional_schema_template.sql 
b/testdata/datasets/functional/functional_schema_template.sql
index a7a5eac..be666ee 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -242,16 +242,16 @@ functional
 ---- BASE_TABLE_NAME
 alltypesinsert
 ---- CREATE
-CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} LIKE 
{db_name}.alltypes
-STORED AS {file_format};
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
+LIKE {db_name}{db_suffix}.alltypes STORED AS {file_format};
 ====
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
 alltypesnopart_insert
 ---- CREATE
-CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} like 
{db_name}.alltypesnopart
-STORED AS {file_format};
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
+LIKE {db_name}{db_suffix}.alltypesnopart STORED AS {file_format};
 ====
 ---- DATASET
 functional
@@ -2009,7 +2009,7 @@ functional
 ---- BASE_TABLE_NAME
 avro_unicode_nulls
 ---- CREATE_HIVE
-create external table if not exists {db_name}{db_suffix}.{table_name} like 
{db_name}.liketbl stored as avro LOCATION '/test-warehouse/avro_null_char';
+create external table if not exists {db_name}{db_suffix}.{table_name} like 
{db_name}{db_suffix}.liketbl stored as avro LOCATION 
'/test-warehouse/avro_null_char';
 ---- LOAD
 `hdfs dfs -mkdir -p /test-warehouse/avro_null_char && \
 hdfs dfs -put -f ${IMPALA_HOME}/testdata/avro_null_char/000000_0 
/test-warehouse/avro_null_char/

Reply via email to