Repository: incubator-impala
Updated Branches:
  refs/heads/master 2bd010781 -> ca55b5926


IMPALA-6070: Parallel compute_table_stats.py

Uses a thread pool to issue many compute stats commands in parallel to
Impala, rather than doing it serially. Where it was obvious, I combined
multiple stats commands into fewer, to reduce the number
of "show databses" and serialized "show tables" commands.

This speeds up the compute stats step in data loading significantly. My
measurements for testdata/bin/compute-table-stats.sh running before and
after this change, with the Impala daemons restarted (cold) or not
restarted (warm) on an 8-core, 32GB RAM machine were:

old, cold: 7m44s
new, cold: 1m42s

old, warm: 1m23s
new, warm:   48s

The data load in the full test build behaves in a cold fashion. It's
typical for https://jenkins.impala.io/job/ubuntu-16.04-from-scratch/ to
run this compute stats step for 9 or 10 minutes. With this change, this
will come down to about 2 minutes.

Change-Id: Ifb080f2552b9dbe304ecadd6e52429214094237d
Reviewed-on: http://gerrit.cloudera.org:8080/8354
Reviewed-by: David Knupp <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/77e010ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/77e010ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/77e010ae

Branch: refs/heads/master
Commit: 77e010ae4ee07c02a7a945fcb5898db50166c767
Parents: 2bd0107
Author: Philip Zeyliger <[email protected]>
Authored: Sat Oct 21 21:27:00 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Oct 24 23:54:15 2017 +0000

----------------------------------------------------------------------
 testdata/bin/compute-table-stats.sh |  6 +--
 tests/util/compute_table_stats.py   | 81 ++++++++++++++++++++++----------
 2 files changed, 58 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77e010ae/testdata/bin/compute-table-stats.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/compute-table-stats.sh 
b/testdata/bin/compute-table-stats.sh
index f27972c..98434ee 100755
--- a/testdata/bin/compute-table-stats.sh
+++ b/testdata/bin/compute-table-stats.sh
@@ -41,10 +41,8 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
 fi
 ${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet \
     --table_names=customer,lineitem,nation,orders,part,partsupp,region,supplier
-${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet
-${COMPUTE_STATS_SCRIPT} --db_names=tpcds,tpcds_parquet
+${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet,tpcds,tpcds_parquet
 
 if "$KUDU_IS_SUPPORTED"; then
-  ${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu
-  ${COMPUTE_STATS_SCRIPT} --db_names=tpch_kudu
+  ${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu,tpch_kudu
 fi

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77e010ae/tests/util/compute_table_stats.py
----------------------------------------------------------------------
diff --git a/tests/util/compute_table_stats.py 
b/tests/util/compute_table_stats.py
index 924d33d..f982802 100755
--- a/tests/util/compute_table_stats.py
+++ b/tests/util/compute_table_stats.py
@@ -19,36 +19,63 @@
 #
 # Utility for computing table statistics of tables in the Hive Metastore
 
+from contextlib import contextmanager
 from optparse import OptionParser
+import logging
+import multiprocessing
+import multiprocessing.pool
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
 
-def compute_stats(impala_client, db_names=None, table_names=None,
-    continue_on_error=False):
+def compute_stats_table(client_factory, db, table, continue_on_error):
+  """
+  Runs 'compute stats' on a given table. If continue_on_error is
+  True, exceptions computing statistics are swallowed.
+  """
+  with client_factory() as impala_client:
+    db_table = "%s.%s" % (db, table)
+    statement = "compute stats %s" % (db_table,)
+    logging.info('Executing: %s', statement)
+    try:
+      result = impala_client.execute(statement)
+      logging.info(" %s -> %s", db_table, ' '.join(result.data).strip())
+    except:
+      logging.exception(' Failed on table %s', db_table)
+      if not continue_on_error:
+        raise
+
+def compute_stats(client_factory, db_names=None, table_names=None,
+    continue_on_error=False, parallelism=multiprocessing.cpu_count()):
   """
   Runs COMPUTE STATS over the selected tables. The target tables can be 
filtered by
   specifying a list of databases and/or table names. If no filters are 
specified this will
   run COMPUTE STATS on all tables in all databases.
+
+  parallelism controls the size of the thread pool to which compute_stats
+  is sent.
   """
-  print "Enumerating databases and tables for compute stats."
+  logging.info("Enumerating databases and tables for compute stats.")
 
-  all_dbs = set(name.split('\t')[0].lower() for name in 
impala_client.execute("show databases").data)
-  selected_dbs = all_dbs if db_names is None else set(db_names)
-  for db in all_dbs.intersection(selected_dbs):
-    all_tables =\
-        set([t.lower() for t in impala_client.execute("show tables in %s" % 
db).data])
-    selected_tables = all_tables if table_names is None else set(table_names)
-    for table in all_tables.intersection(selected_tables):
-      statement = "compute stats %s.%s" % (db, table)
-      print 'Executing: %s' % statement
-      try:
-        result = impala_client.execute(statement)
-        print "  -> %s\n" % '\n'.join(result.data)
-      except Exception, e:
-        print "  -> Error: %s\n" % str(e)
-        if not continue_on_error: raise e
+  pool = multiprocessing.pool.ThreadPool(processes=parallelism)
+  futures = []
+  with client_factory() as impala_client:
+    all_dbs = set(name.split('\t')[0].lower() for name
+        in impala_client.execute("show databases").data)
+    selected_dbs = all_dbs if db_names is None else set(db_names)
+    for db in all_dbs.intersection(selected_dbs):
+      all_tables =\
+          set([t.lower() for t in impala_client.execute("show tables in %s" % 
db).data])
+      selected_tables = all_tables if table_names is None else set(table_names)
+      for table in all_tables.intersection(selected_tables):
+        # Submit command to threadpool
+        futures.append(pool.apply_async(compute_stats_table,
+            (client_factory, db, table, continue_on_error,)))
+    # Wait for all stats commands to finish
+    for f in futures:
+      f.get()
 
 if __name__ == "__main__":
+  logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s: 
%(message)s')
   parser = OptionParser()
   parser.add_option("--continue_on_error", dest="continue_on_error",
                     action="store_true", default=True, help="If True, continue 
"\
@@ -62,6 +89,8 @@ if __name__ == "__main__":
                     help="Compute stats on a kerberized cluster.")
   parser.add_option("--use_ssl", action="store_true", default=False,
                     help="Compute stats on a cluster with SSL enabled.")
+  parser.add_option("--parallelism", type=int, 
default=multiprocessing.cpu_count(),
+                    help="Number of parallel compute stats commands.")
   parser.add_option("--db_names", dest="db_names", default=None,
                     help="Comma-separated list of database names for which to 
compute "\
                     "stats. Can be used in conjunction with the --table_names 
flag. "\
@@ -80,11 +109,13 @@ if __name__ == "__main__":
   if options.db_names is not None:
     db_names = [name.lower().strip() for name in options.db_names.split(',')]
 
-  impala_client = ImpalaBeeswaxClient(options.impalad, 
use_kerberos=options.use_kerberos,
-      use_ssl=options.use_ssl)
-  impala_client.connect()
-  try:
-    compute_stats(impala_client, db_names=db_names,
-        table_names=table_names, continue_on_error=options.continue_on_error)
-  finally:
+  @contextmanager
+  def client_factory():
+    impala_client = ImpalaBeeswaxClient(options.impalad,
+        use_kerberos=options.use_kerberos, use_ssl=options.use_ssl)
+    impala_client.connect()
+    yield impala_client
     impala_client.close_connection()
+
+  compute_stats(client_factory, db_names=db_names, table_names=table_names,
+      continue_on_error=options.continue_on_error, 
parallelism=options.parallelism)

Reply via email to