This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 01b8b4525 IMPALA-13620: Refresh compute_table_stats.py script
01b8b4525 is described below

commit 01b8b45252d50e4887278ae60b6bcf37c68440bb
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Dec 17 11:53:43 2024 -0800

    IMPALA-13620: Refresh compute_table_stats.py script
    
    This patch refreshes compute_table_stats.py script with the following
    changes:
    - Limit parallelism to IMPALA_BUILD_THREADS at maximum if --parallelism
      argument is not set.
    - Change its default connection to hs2, leveraging existing
      ImpylaHS2Connection.
    - Change OptionParser to ArgumentParser.
    - Use impala-python3 to run the script.
    - Add --exclude_table_names to skip running COMPUTE STATS on certain
      tables/views.
    - continue_on_error is False by default.
    
    This patch also improves query handle logging in ImpylaHS2Connection.
    collect_profile_and_log argument is added to control whether to pull
    logs and runtime profile at the end of __fetch_results(). The default
    behavior remains unchanged.
    
    Skip COMPUTE STATS for functional_kudu.alltypesagg and
    functional_kudu.manynulls because it is invalid to run COMPUTE STATS
    over view.
    
    Customized hive-site.xml to set datanucleus.connectionPool.maxPoolSize
    to 30 and hikaricp.connectionTimeout to 60000 ms. Also set hive.log.dir
    to ${IMPALA_CLUSTER_LOGS_DIR}/hive.
    
    Testing:
    Repeatedly run compute-table-stats.sh from cold state and confirm there
    is no error occurs. This is the script to do so from active minicluster:
    
    cd $IMPALA_HOME
    ./bin/start-impala-cluster.py --kill
    ./testdata/bin/kill-hive-server.sh
    ./testdata/bin/run-hive-server.sh
    ./bin/start-impala-cluster.py
    ./testdata/bin/compute-table-stats.sh > /tmp/compute-stats.txt 2>&1
    grep error /tmp/compute-stats.txt
    
    Core tests ran and passed.
    
    Change-Id: I1ebf02f95b957e7dda3a30622b87e8fca3197699
    Reviewed-on: http://gerrit.cloudera.org:8080/22231
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 fe/src/test/resources/hive-site.xml.py |   2 +
 testdata/bin/compute-table-stats.sh    |  15 ++-
 testdata/bin/run-hive-server.sh        |   6 +-
 tests/common/impala_connection.py      |  64 ++++++++-----
 tests/util/compute_table_stats.py      | 165 ++++++++++++++++++++++-----------
 5 files changed, 167 insertions(+), 85 deletions(-)

diff --git a/fe/src/test/resources/hive-site.xml.py 
b/fe/src/test/resources/hive-site.xml.py
index 9b9c2dbfd..137b55d28 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -220,8 +220,10 @@ CONFIG.update({
   'datanucleus.autoCreateSchema': 'false',
   'datanucleus.fixedDatastore': 'false',
   'datanucleus.metadata.validate': 'false',
+  'datanucleus.connectionPool.maxPoolSize': 30,
   'javax.jdo.option.ConnectionUserName': 'hiveuser',
   'javax.jdo.option.ConnectionPassword': 'password',
+  'hikaricp.connectionTimeout': 60000,
 })
 if db_type == 'postgres':
   CONFIG.update({
diff --git a/testdata/bin/compute-table-stats.sh 
b/testdata/bin/compute-table-stats.sh
index 40001eaa4..76e1ee757 100755
--- a/testdata/bin/compute-table-stats.sh
+++ b/testdata/bin/compute-table-stats.sh
@@ -26,9 +26,10 @@ setup_report_build_error
 . ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
 
 # TODO: We need a better way of managing how these get set. See IMPALA-4346
-IMPALAD=${IMPALAD:-localhost:21000}
+IMPALAD_HS2=${IMPALAD_HS2:-localhost:21050}
 
-COMPUTE_STATS_SCRIPT="${IMPALA_HOME}/tests/util/compute_table_stats.py 
--impalad=${IMPALAD}"
+COMPUTE_STATS_SCRIPT="${IMPALA_HOME}/tests/util/compute_table_stats.py\
+    --impalad=${IMPALAD_HS2}"
 
 # Run compute stats over as many of the tables used in the Planner tests as 
possible.
 ${COMPUTE_STATS_SCRIPT} --db_names=functional\
@@ -46,10 +47,8 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
 fi
 ${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet,tpch_orc_def \
     --table_names=customer,lineitem,nation,orders,part,partsupp,region,supplier
-${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet,tpcds,tpcds_parquet
-${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu,tpch_kudu
+${COMPUTE_STATS_SCRIPT} 
--db_names="tpch_nested_parquet,tpch_kudu,tpcds,tpcds_parquet,\
+    tpcds_partitioned_parquet_snap"
+${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu \
+    --exclude_table_names="alltypesagg,manynulls"
 
-# Compute tables of tpcds_partitioned_parquet_snap serially
-# due to large number of partitions in some of the fact tables.
-${COMPUTE_STATS_SCRIPT} --db_names=tpcds_partitioned_parquet_snap \
-    --parallelism=1
diff --git a/testdata/bin/run-hive-server.sh b/testdata/bin/run-hive-server.sh
index f64d4cdd5..dccf4e6a2 100755
--- a/testdata/bin/run-hive-server.sh
+++ b/testdata/bin/run-hive-server.sh
@@ -159,7 +159,8 @@ export 
KUDU_SKIP_HMS_PLUGIN_VALIDATION=${KUDU_SKIP_HMS_PLUGIN_VALIDATION:-1}
 # To debug log4j2 loading issues, add to HADOOP_CLIENT_OPTS:
 #   -Dorg.apache.logging.log4j.simplelog.StatusLogger.level=TRACE
 if [[ ${START_METASTORE} -eq 1 && -z $HMS_PID ]]; then
-  HADOOP_CLIENT_OPTS="-Xmx2024m -Dhive.log.file=hive-metastore.log" hive \
+  HADOOP_CLIENT_OPTS="-Xmx2024m  -Dhive.log.dir=${LOGDIR} \
+      -Dhive.log.file=hive-metastore.log" hive \
       --service metastore -p $HIVE_METASTORE_PORT >> 
${LOGDIR}/hive-metastore.out 2>&1 &
 
   # Wait for the Metastore to come up because HiveServer2 relies on it being 
live.
@@ -194,7 +195,8 @@ if [[ ${START_HIVESERVER} -eq 1 && -z $HS2_PID ]]; then
   # Starts a HiveServer2 instance on the port specified by the 
HIVE_SERVER2_THRIFT_PORT
   # environment variable. HADOOP_HEAPSIZE should be set to at least 2048 to 
avoid OOM
   # when loading ORC tables like widerow.
-  HADOOP_CLIENT_OPTS="-Xmx2048m -Dhive.log.file=hive-server2.log" hive \
+  HADOOP_CLIENT_OPTS="-Xmx2048m -Dhive.log.dir=${LOGDIR} \
+      -Dhive.log.file=hive-server2.log" hive \
       --service hiveserver2 >> ${LOGDIR}/hive-server2.out 2>&1 &
 
   # Wait for the HiveServer2 service to come up because callers of this script
diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index 2ebaacc2a..0bdf2a8ed 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -32,7 +32,7 @@ from RuntimeProfile.ttypes import TRuntimeProfileFormat
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
 
 
-LOG = logging.getLogger('impala_connection')
+LOG = logging.getLogger(__name__)
 console_handler = logging.StreamHandler()
 console_handler.setLevel(logging.INFO)
 # All logging needs to be either executable SQL or a SQL comment (prefix with 
--).
@@ -274,10 +274,12 @@ class ImpylaHS2Connection(ImpalaConnection):
   TODO: implement support for kerberos, SSL, etc.
   """
   def __init__(self, host_port, use_kerberos=False, is_hive=False,
-               use_http_transport=False, http_path=""):
+               use_http_transport=False, http_path="", use_ssl=False,
+               collect_profile_and_log=True):
     self.__host_port = host_port
     self.__use_http_transport = use_http_transport
     self.__http_path = http_path
+    self.__use_ssl = use_ssl
     if use_kerberos:
       raise NotImplementedError("Kerberos support not yet implemented")
     # Impyla connection and cursor is initialised in connect(). We need to 
reuse the same
@@ -289,6 +291,7 @@ class ImpylaHS2Connection(ImpalaConnection):
     # Query options to send along with each query.
     self.__query_options = {}
     self._is_hive = is_hive
+    self._collect_profile_and_log = not is_hive and collect_profile_and_log
 
   def set_configuration_option(self, name, value):
     self.__query_options[name] = str(value)
@@ -309,7 +312,8 @@ class ImpylaHS2Connection(ImpalaConnection):
       conn_kwargs['auth_mechanism'] = 'PLAIN'
     self.__impyla_conn = impyla.connect(host=host, port=int(port),
                                         
use_http_transport=self.__use_http_transport,
-                                        http_path=self.__http_path, 
**conn_kwargs)
+                                        http_path=self.__http_path,
+                                        use_ssl=self.__use_ssl, **conn_kwargs)
     # Get the default query options for the session before any modifications 
are made.
     self.__cursor = self.__impyla_conn.cursor(convert_types=False)
     self.__default_query_options = {}
@@ -345,7 +349,7 @@ class ImpylaHS2Connection(ImpalaConnection):
     return self.__cursor.fetchall()
 
   def close_query(self, operation_handle):
-    LOG.info("-- closing query for operation handle: 
{0}".format(operation_handle))
+    self.log_handle(operation_handle, 'closing query for operation')
     operation_handle.get_handle().close_operation()
 
   def execute(self, sql_stmt, user=None, 
profile_format=TRuntimeProfileFormat.STRING,
@@ -392,57 +396,68 @@ class ImpylaHS2Connection(ImpalaConnection):
       raise
 
   def cancel(self, operation_handle):
-    LOG.info("-- canceling operation: {0}".format(operation_handle))
+    self.log_handle(operation_handle, 'canceling operation')
     cursor = operation_handle.get_handle()
     return cursor.cancel_operation(reset_state=False)
 
   def get_query_id(self, operation_handle):
-    """Return the string representation of the query id."""
-    guid_bytes = \
-        operation_handle.get_handle()._last_operation.handle.operationId.guid
+    """Return the string representation of the query id.
+    Return empty string if handle is already canceled or closed."""
+    last_op = operation_handle.get_handle()._last_operation
+    if last_op is None:
+      return ""
+    guid_bytes = last_op.handle.operationId.guid
     # hex_codec works on bytes, so this needs to a decode() to get back to a 
string
     hi_str = codecs.encode(guid_bytes[7::-1], 'hex_codec').decode()
     lo_str = codecs.encode(guid_bytes[16:7:-1], 'hex_codec').decode()
     return "{0}:{1}".format(hi_str, lo_str)
 
+  def handle_id_for_logging(self, operation_handle):
+    query_id = self.get_query_id(operation_handle)
+    return query_id if query_id else str(operation_handle)
+
+  def log_handle(self, handle, message):
+    handle_id = self.handle_id_for_logging(handle)
+    LOG.info("-- {0}: {1}".format(handle_id, message))
+
   def get_state(self, operation_handle):
-    LOG.info("-- getting state for operation: {0}".format(operation_handle))
+    handle_id = self.handle_id_for_logging(operation_handle)
+    LOG.info("-- getting state for operation: {0}".format(handle_id))
     cursor = operation_handle.get_handle()
     return cursor.status()
 
   def state_is_finished(self, operation_handle):
-    LOG.info("-- checking finished state for operation: 
{0}".format(operation_handle))
+    self.log_handle(operation_handle, 'checking finished state for operation')
     cursor = operation_handle.get_handle()
     # cursor.status contains a string representation of one of
     # TCLIService.TOperationState.
     return cursor.status() == "FINISHED_STATE"
 
   def get_exec_summary(self, operation_handle):
-    LOG.info("-- getting exec summary operation: {0}".format(operation_handle))
+    self.log_handle(operation_handle, 'getting exec summary operation')
     cursor = operation_handle.get_handle()
     # summary returned is thrift, not string.
     return cursor.get_summary()
 
   def get_runtime_profile(self, operation_handle, profile_format):
-    LOG.info("-- getting runtime profile operation: 
{0}".format(operation_handle))
+    self.log_handle(operation_handle, 'getting runtime profile operation')
     cursor = operation_handle.get_handle()
     return cursor.get_profile(profile_format=profile_format)
 
   def wait_for_finished_timeout(self, operation_handle, timeout):
-    LOG.info("-- waiting for query to reach FINISHED state: 
{0}".format(operation_handle))
+    self.log_handle(operation_handle, 'waiting for query to reach FINISHED 
state')
     raise NotImplementedError("Not yet implemented for HS2 - states differ 
from beeswax")
 
   def wait_for_admission_control(self, operation_handle):
-    LOG.info("-- waiting for completion of the admission control processing of 
the "
-        "query: {0}".format(operation_handle))
+    self.log_handle(operation_handle, 'waiting for completion of the admission 
control')
     raise NotImplementedError("Not yet implemented for HS2 - states differ 
from beeswax")
 
   def get_admission_result(self, operation_handle):
-    LOG.info("-- getting the admission result: {0}".format(operation_handle))
+    self.log_handle(operation_handle, 'getting the admission result')
     raise NotImplementedError("Not yet implemented for HS2 - states differ 
from beeswax")
 
   def get_log(self, operation_handle):
-    LOG.info("-- getting log for operation: {0}".format(operation_handle))
+    self.log_handle(operation_handle, 'getting log for operation')
     # HS2 includes non-error log messages that we need to filter out.
     cursor = operation_handle.get_handle()
     lines = [line for line in cursor.get_log().split('\n')
@@ -450,7 +465,7 @@ class ImpylaHS2Connection(ImpalaConnection):
     return '\n'.join(lines)
 
   def fetch(self, sql_stmt, handle, max_rows=-1):
-    LOG.info("-- fetching results from: {0}".format(handle))
+    self.log_handle(handle, 'fetching results')
     return self.__fetch_results(handle, max_rows)
 
   def __fetch_results(self, handle, max_rows=-1,
@@ -471,7 +486,7 @@ class ImpylaHS2Connection(ImpalaConnection):
       else:
         result_tuples = cursor.fetchmany(max_rows)
 
-    if not self._is_hive:
+    if not self._is_hive and self._collect_profile_and_log:
       log = self.get_log(handle)
       profile = self.get_runtime_profile(handle, profile_format=profile_format)
     else:
@@ -520,16 +535,19 @@ class ImpylaHS2ResultSet(object):
 
 
 def create_connection(host_port, use_kerberos=False, protocol='beeswax',
-    is_hive=False):
+    is_hive=False, use_ssl=False, collect_profile_and_log=True):
   if protocol == 'beeswax':
-    c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos)
+    c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos,
+                          use_ssl=use_ssl)
   elif protocol == 'hs2':
     c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
-        is_hive=is_hive)
+        is_hive=is_hive, use_ssl=use_ssl,
+        collect_profile_and_log=collect_profile_and_log)
   else:
     assert protocol == 'hs2-http'
     c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
-        is_hive=is_hive, use_http_transport=True, http_path='cliservice')
+        is_hive=is_hive, use_http_transport=True, http_path='cliservice',
+        use_ssl=use_ssl, collect_profile_and_log=collect_profile_and_log)
 
   # A hook in conftest sets tests.common.current_node. Skip for Hive 
connections since
   # Hive cannot modify client_identifier at runtime.
diff --git a/tests/util/compute_table_stats.py 
b/tests/util/compute_table_stats.py
index bb25e6c67..206a70479 100755
--- a/tests/util/compute_table_stats.py
+++ b/tests/util/compute_table_stats.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env impala-python
+#!/usr/bin/env impala-python3
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -21,14 +21,22 @@
 
 from __future__ import absolute_import, division, print_function
 from contextlib import contextmanager
-from optparse import OptionParser
+from argparse import ArgumentParser
 import logging
 import multiprocessing
 import multiprocessing.pool
+import os
 
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
+from tests.common.impala_connection import create_connection
 
-def compute_stats_table(client_factory, db, table, continue_on_error):
+logging.basicConfig(level=logging.INFO,
+                    format='%(asctime)s %(threadName)s: %(message)s')
+LOG = logging.getLogger(__name__)
+DEFAULT_PARALLELISM = int(
+    os.environ.get('IMPALA_BUILD_THREADS', multiprocessing.cpu_count()))
+
+
+def compute_stats_table(client_factory, db, table):
   """
   Runs 'compute stats' on a given table. If continue_on_error is
   True, exceptions computing statistics are swallowed.
@@ -36,17 +44,27 @@ def compute_stats_table(client_factory, db, table, 
continue_on_error):
   with client_factory() as impala_client:
     db_table = "%s.%s" % (db, table)
     statement = "compute stats %s" % (db_table,)
-    logging.info('Executing: %s', statement)
+    LOG.info('Executing: %s', statement)
     try:
       result = impala_client.execute(statement)
-      logging.info(" %s -> %s", db_table, ' '.join(result.data).strip())
-    except:
-      logging.exception(' Failed on table %s', db_table)
-      if not continue_on_error:
-        raise
+      LOG.info(" %s -> %s", db_table, ' '.join(result.data).strip())
+      return db_table
+    except Exception as e:
+      LOG.exception(' Failed on table %s', db_table)
+      raise e
+
+
+def log_completion(completed, total_tables, error=None):
+    if error:
+      LOG.error("Completed COMPUTE STATS for %d/%d tables with error.",
+                completed, total_tables, exc_info=error)
+    else:
+      LOG.info("Completed COMPUTE STATS for %d/%d tables.", completed, 
total_tables)
+
 
 def compute_stats(client_factory, db_names=None, table_names=None,
-    continue_on_error=False, parallelism=multiprocessing.cpu_count()):
+                  exclude_table_names=None, continue_on_error=False,
+                  parallelism=DEFAULT_PARALLELISM):
   """
   Runs COMPUTE STATS over the selected tables. The target tables can be 
filtered by
   specifying a list of databases and/or table names. If no filters are 
specified this will
@@ -55,68 +73,111 @@ def compute_stats(client_factory, db_names=None, 
table_names=None,
   parallelism controls the size of the thread pool to which compute_stats
   is sent.
   """
-  logging.info("Enumerating databases and tables for compute stats.")
+  LOG.info("Enumerating databases and tables for compute stats. "
+           "db_names={} table_names={} exclude_table_names={} 
parallelism={}.".format(
+             str(db_names), str(table_names), str(exclude_table_names), 
parallelism
+           ))
 
   pool = multiprocessing.pool.ThreadPool(processes=parallelism)
   futures = []
+
   with client_factory() as impala_client:
+    db_table_map = {}
+    total_tables = 0
     all_dbs = set(name.split('\t')[0].lower() for name
         in impala_client.execute("show databases").data)
     selected_dbs = all_dbs if db_names is None else set(db_names)
     for db in all_dbs.intersection(selected_dbs):
-      all_tables =\
-          set([t.lower() for t in impala_client.execute("show tables in %s" % 
db).data])
+      all_tables = set(
+        [t.lower() for t in impala_client.execute("show tables in %s" % 
db).data])
       selected_tables = all_tables if table_names is None else set(table_names)
-      for table in all_tables.intersection(selected_tables):
+      excluded_tables = (set() if exclude_table_names is None
+                         else set(exclude_table_names))
+      tables_to_compute = (all_tables.intersection(selected_tables)
+                           - excluded_tables)
+      db_table_map[db] = tables_to_compute
+      total_tables += len(tables_to_compute)
+
+    for db, tables in db_table_map.items():
+      for table in tables:
         # Submit command to threadpool
-        futures.append(pool.apply_async(compute_stats_table,
-            (client_factory, db, table, continue_on_error,)))
+        futures.append(
+          pool.apply_async(compute_stats_table, (client_factory, db, table,)))
+
     # Wait for all stats commands to finish
+    completed = 0
     for f in futures:
-      f.get()
+      try:
+        f.get()
+        completed += 1
+      except Exception as e:
+        if not continue_on_error:
+          log_completion(completed, total_tables, e)
+          raise e
+    log_completion(completed, total_tables)
+
 
 if __name__ == "__main__":
-  logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s: 
%(message)s')
-  parser = OptionParser()
-  parser.add_option("--continue_on_error", dest="continue_on_error",
-                    action="store_true", default=True, help="If True, continue 
"\
-                    "if there is an error executing the compute stats 
statement.")
-  parser.add_option("--stop_on_error", dest="continue_on_error",
-                    action="store_false", default=True, help="If True, stop "\
-                    "if there is an error executing the compute stats 
statement.")
-  parser.add_option("--impalad", dest="impalad", default="localhost:21000",
-                    help="Impala daemon <host:port> to connect to.")
-  parser.add_option("--use_kerberos", action="store_true", default=False,
-                    help="Compute stats on a kerberized cluster.")
-  parser.add_option("--use_ssl", action="store_true", default=False,
-                    help="Compute stats on a cluster with SSL enabled.")
-  parser.add_option("--parallelism", type=int, 
default=multiprocessing.cpu_count(),
-                    help="Number of parallel compute stats commands.")
-  parser.add_option("--db_names", dest="db_names", default=None,
-                    help="Comma-separated list of database names for which to 
compute "\
-                    "stats. Can be used in conjunction with the --table_names 
flag. "\
-                    "If not specified, compute stats will run on tables from 
all "\
-                    "databases.")
-  parser.add_option("--table_names", dest="table_names", default=None,
-                    help="Comma-separated list of table names to compute stats 
over. A"\
-                    " substring comparison is done. If no tables are specified 
stats "\
-                    "are computed across all tables.")
-  options, args = parser.parse_args()
+  parser = ArgumentParser()
+  group_continuation_opt = parser.add_mutually_exclusive_group()
+  group_continuation_opt.add_argument(
+    "--continue_on_error", dest="continue_on_error", action="store_true",
+    help="If True, continue if there is an error executing the compute stats 
statement.")
+  group_continuation_opt.add_argument(
+    "--stop_on_error", dest="continue_on_error", action="store_false",
+    help="If True, stop if there is an error executing the compute stats 
statement.")
+  parser.add_argument(
+    "--impalad", dest="impalad", default="localhost:21050",
+    help="Impala daemon <host:hs2_port> to connect to.")
+  parser.add_argument(
+    "--use_kerberos", action="store_true", default=False,
+    help="Compute stats on a kerberized cluster.")
+  parser.add_argument(
+    "--use_ssl", action="store_true", default=False,
+    help="Compute stats on a cluster with SSL enabled.")
+  parser.add_argument(
+    "--parallelism", type=int, default=DEFAULT_PARALLELISM,
+    help="Number of parallel compute stats commands.")
+  parser.add_argument(
+    "--db_names", dest="db_names", default=None, help=(
+      "Comma-separated list of database names for which to compute stats. "
+      "Can be used in conjunction with the --table_names or 
--exclude_table_names flag. "
+      "If not specified, compute stats will run on tables from all 
databases."))
+  group_selection_opt = parser.add_mutually_exclusive_group()
+  group_selection_opt.add_argument(
+    "--table_names", dest="table_names", default=None, help=(
+      "Comma-separated list of table names to compute stats over. "
+      "A substring comparison is done. If no tables are specified stats are 
computed "
+      "across all tables. Can not be used in conjunction with 
--exclude_table_names."))
+  group_selection_opt.add_argument(
+    "--exclude_table_names", dest="exclude_table_names", default=None, help=(
+      "Comma-separated list of table names to exclude compute stats. "
+      "A substring comparison is done. If no tables are specified stats are 
computed "
+      "across all tables. Can not be used in conjunction with --table_names."))
+  args = parser.parse_args()
+
   table_names = None
-  if options.table_names is not None:
-    table_names = [name.lower().strip() for name in 
options.table_names.split(',')]
+  if args.table_names is not None:
+    table_names = [name.lower().strip() for name in 
args.table_names.split(',')]
+
+  exclude_table_names = None
+  if args.exclude_table_names is not None:
+    exclude_table_names = [name.lower().strip()
+                           for name in args.exclude_table_names.split(',')]
 
   db_names = None
-  if options.db_names is not None:
-    db_names = [name.lower().strip() for name in options.db_names.split(',')]
+  if args.db_names is not None:
+    db_names = [name.lower().strip() for name in args.db_names.split(',')]
 
   @contextmanager
   def client_factory():
-    impala_client = ImpalaBeeswaxClient(options.impalad,
-        use_kerberos=options.use_kerberos, use_ssl=options.use_ssl)
+    impala_client = create_connection(args.impalad,
+        use_kerberos=args.use_kerberos, use_ssl=args.use_ssl, protocol='hs2',
+        collect_profile_and_log=False)
     impala_client.connect()
     yield impala_client
-    impala_client.close_connection()
+    impala_client.close()
 
   compute_stats(client_factory, db_names=db_names, table_names=table_names,
-      continue_on_error=options.continue_on_error, 
parallelism=options.parallelism)
+                exclude_table_names=exclude_table_names,
+                continue_on_error=args.continue_on_error, 
parallelism=args.parallelism)

Reply via email to