IMPALA-3980: qgen: re-enable Hive as a target database Changes:
* Added hive cli options back in (removed in commit "Stress test: Various changes") * Modifications so that if --use-hive is specified, a Hive connection is actually created * A few minor bug fixes so that the RQG can be run locally * Modified MiniCluster to use HADOOP_CONF_DIR and HIVE_CONF_DIR rather than a hard-coded file under IMPALA_HOME * Fixed fe/src/test/resources/hive-default.xml so that it is a valid XML file, it was missing a few element terminators that cause an exception in the cluster.py file Testing: * Hive integration tested locally by invoking the data generator via the command: ./data-generator.py \ --db-name=functional \ --use-hive \ --min-row-count=50 \ --max-row-count=100 \ --storage-file-formats textfile \ --use-postgresql \ --postgresql-user stakiar and the discrepancy checker via the command: ./discrepancy-checker.py \ --db-name=functional \ --use-hive \ --use-postgresql \ --postgresql-user stakiar \ --test-db-type HIVE \ --timeout 300 \ --query-count 50 \ --profile hive * The output of the above two commands is essentially the same as the Impala output, however, about 20% of the queries will fail when the discrepancy checker is run * Regression testing done by running Leopard in a local VM running Ubuntu 14.04, and by running the discrepancy checker against Impala while inside an Impala Docker container Change-Id: Ifb1199b50a5b65c21de7876fb70cc03bda1a9b46 Reviewed-on: http://gerrit.cloudera.org:8080/4011 Reviewed-by: Taras Bobrovytsky <[email protected]> Tested-by: Taras Bobrovytsky <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0780d2c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0780d2c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0780d2c8 Branch: refs/heads/master Commit: 0780d2c8af56f5439ad482319e4dd7f106e38047 Parents: 4d9c261 Author: Sahil Takiar <[email protected]> Authored: Mon Aug 15 17:16:26 2016 -0700 Committer: Taras Bobrovytsky <[email protected]> Committed: Tue Sep 27 22:24:59 2016 +0000 ---------------------------------------------------------------------- fe/src/test/resources/hive-default.xml | 8 ++-- tests/comparison/cli_options.py | 23 ++++++++++- tests/comparison/cluster.py | 62 ++++++++++++++++++++++++----- tests/comparison/data_generator.py | 35 +++++++++++----- tests/comparison/db_connection.py | 17 ++++++++ 5 files changed, 121 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0780d2c8/fe/src/test/resources/hive-default.xml ---------------------------------------------------------------------- diff --git a/fe/src/test/resources/hive-default.xml b/fe/src/test/resources/hive-default.xml index 84a44e2..08e102b 100644 --- a/fe/src/test/resources/hive-default.xml +++ b/fe/src/test/resources/hive-default.xml @@ -479,6 +479,7 @@ If the user has set hive.merge.mapfiles to true and hive.merge.mapredfiles to false, the idea was the number of reducers are few, so the number of files anyway are small. However, with this optimization, we are increasing the number of files possibly by a big margin. So, we merge aggresively. + </description> </property> <property> @@ -487,6 +488,7 @@ <description>Whether the version of hadoop which is running supports sub-directories for tables/partitions. Many hive optimizations can be applied if the hadoop version supports sub-directories for tables/partitions. It was added by MAPREDUCE-1501 + </description> </property> <property> @@ -552,9 +554,9 @@ to know how to construct the canonical path. It just gives user choice if they want to change the default directory name. For example, there are 2 skewed column c1 and c2. 2 skewed value: (1,a) and (2,b). subdirectory: - <partition-dir>/c1=1/c2=a/ - <partition-dir>/c1=2/c2=b/ - <partition-dir>/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/ + <partition-dir>/c1=1/c2=a/ + <partition-dir>/c1=2/c2=b/ + <partition-dir>/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/ Note: This config won't impact users if they don't list bucketing. </description> </property> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0780d2c8/tests/comparison/cli_options.py ---------------------------------------------------------------------- diff --git a/tests/comparison/cli_options.py b/tests/comparison/cli_options.py index 4e28832..92901f4 100644 --- a/tests/comparison/cli_options.py +++ b/tests/comparison/cli_options.py @@ -24,7 +24,8 @@ from getpass import getuser from tempfile import gettempdir import db_connection -from cluster import CmCluster, MiniCluster +from cluster import CmCluster, DEFAULT_HIVE_HOST, DEFAULT_HIVE_PORT, MiniCluster, \ + MiniHiveCluster from db_types import TYPES def add_logging_options(section, default_debug_log_file=None): @@ -119,8 +120,10 @@ def create_cluster(args): cluster = CmCluster(args.cm_host, user=args.cm_user, password=args.cm_password, cluster_name=args.cm_cluster_name, ssh_user=args.ssh_user, ssh_port=args.ssh_port, ssh_key_file=args.ssh_key_file) + elif args.use_hive: + cluster = MiniHiveCluster(args.hive_host, args.hive_port) else: - cluster = MiniCluster(args.minicluster_num_impalads) + cluster = MiniCluster(args.hive_host, args.hive_port, args.minicluster_num_impalads) cluster.hadoop_user_name = args.hadoop_user_name return cluster @@ -143,6 +146,20 @@ def add_timeout_option(section): def add_connection_option_groups(parser): + + group = parser.add_argument_group("Hive Options") + group.add_argument('--use-hive', action='store_true', default=False, + help='Use Hive (Impala will be skipped)') + group.add_argument('--hive-host', default=DEFAULT_HIVE_HOST, + help="The name of the host running the HS2") + group.add_argument("--hive-port", default=DEFAULT_HIVE_PORT, type=int, + help="The port of HiveServer2") + group.add_argument('--hive-user', default='hive', + help="The user name to use when connecting to HiveServer2") + group.add_argument('--hive-password', default='hive', + help="The password to use when connecting to HiveServer2") + parser.add_argument_group(group) + group = parser.add_argument_group('MySQL Options') group.add_argument('--use-mysql', action='store_true', help='Use MySQL') @@ -208,6 +225,8 @@ def create_connection(args, db_type=None, db_name=None): conn_class = db_connection.MySQLConnection elif db_type == db_connection.ORACLE: conn_class = db_connection.OracleConnection + elif db_type == db_connection.HIVE: + conn_class = db_connection.HiveConnection else: raise Exception('Unexpected db_type: %s; expected one of %s.' % (db_type, ', '.join([db_connection.POSTGRESQL, db_connection.MYSQL, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0780d2c8/tests/comparison/cluster.py ---------------------------------------------------------------------- diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py index c602df1..a009e92 100644 --- a/tests/comparison/cluster.py +++ b/tests/comparison/cluster.py @@ -40,6 +40,7 @@ from sys import maxint from tempfile import mkdtemp from threading import Lock from time import mktime, strptime +from urlparse import urlparse from xml.etree.ElementTree import parse as parse_xml from zipfile import ZipFile @@ -51,6 +52,8 @@ from tests.util.parse_util import parse_glog, parse_mem_to_mb LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) +DEFAULT_HIVE_HOST = '127.0.0.1' +DEFAULT_HIVE_PORT = 11050 DEFAULT_TIMEOUT = 300 class Cluster(object): @@ -72,7 +75,7 @@ class Cluster(object): self._hive = None self._impala = None - def get_hadoop_config(self, key): + def _load_hadoop_config(self): if not self._hadoop_configs: self._hadoop_configs = dict() for file_name in os.listdir(self.local_hadoop_conf_dir): @@ -87,7 +90,17 @@ class Cluster(object): if value is None or value.text is None: continue self._hadoop_configs[name.text] = value.text - return self._hadoop_configs[key] + + def get_hadoop_config(self, key, default=None): + """Returns the Hadoop Configuration value mapped to the given key. If a default is + specified, it is returned if the key is cannot be found. If no default is specified + and the key cannot be found, a 'No Such Key' error will be thrown. + """ + self._load_hadoop_config() + result = self._hadoop_configs.get(key, default) + if result is None: + raise KeyError + return result @abstractproperty def shell(self, cmd, host_name, timeout_secs=DEFAULT_TIMEOUT): @@ -152,8 +165,11 @@ class Cluster(object): class MiniCluster(Cluster): - def __init__(self, num_impalads=3): + def __init__(self, hive_host=DEFAULT_HIVE_HOST, hive_port=DEFAULT_HIVE_PORT, + num_impalads=3): Cluster.__init__(self) + self.hive_host = hive_host + self.hive_port = hive_port self.num_impalads = num_impalads def shell(self, cmd, unused_host_name, timeout_secs=DEFAULT_TIMEOUT): @@ -162,21 +178,28 @@ class MiniCluster(Cluster): def _init_local_hadoop_conf_dir(self): self._local_hadoop_conf_dir = mkdtemp() - node_conf_dir = os.path.join(os.environ["IMPALA_HOME"], "testdata", "cluster", - "cdh%s" % os.environ["CDH_MAJOR_VERSION"], "node-1", "etc", "hadoop", "conf") + node_conf_dir = self._get_node_conf_dir() for file_name in os.listdir(node_conf_dir): shutil.copy(os.path.join(node_conf_dir, file_name), self._local_hadoop_conf_dir) - other_conf_dir = os.path.join(os.environ["IMPALA_HOME"], "fe", "src", "test", - "resources") + other_conf_dir = self._get_other_conf_dir() for file_name in ["hive-site.xml"]: shutil.copy(os.path.join(other_conf_dir, file_name), self._local_hadoop_conf_dir) + def _get_node_conf_dir(self): + return os.path.join(os.environ["IMPALA_HOME"], "testdata", "cluster", + "cdh%s" % os.environ["CDH_MAJOR_VERSION"], "node-1", + "etc", "hadoop", "conf") + + def _get_other_conf_dir(self): + return os.path.join(os.environ["IMPALA_HOME"], "fe", "src", "test", + "resources") + def _init_hdfs(self): self._hdfs = Hdfs(self, self.hadoop_user_name) def _init_hive(self): - self._hive = Hive(self, "127.0.0.1", 11050) + self._hive = Hive(self, self.hive_host, self.hive_port) def _init_impala(self): hs2_base_port = 21050 @@ -185,6 +208,23 @@ class MiniCluster(Cluster): for p in xrange(self.num_impalads)] self._impala = Impala(self, impalads) +class MiniHiveCluster(MiniCluster): + """ + A MiniCluster useful for running against Hive. It allows Hadoop configuration files + to be specified by HADOOP_CONF_DIR and Hive configuration files to be specified by + HIVE_CONF_DIR. + """ + + def __init__(self, hive_host=DEFAULT_HIVE_HOST, hive_port=DEFAULT_HIVE_PORT): + MiniCluster.__init__(self) + self.hive_host = hive_host + self.hive_port = hive_port + + def _get_node_conf_dir(self): + return os.environ["HADOOP_CONF_DIR"] + + def _get_other_conf_dir(self): + return os.environ["HIVE_CONF_DIR"] class CmCluster(Cluster): @@ -315,7 +355,8 @@ class Hdfs(Service): def create_client(self, as_admin=False): """Returns an HdfsClient.""" - endpoint = self.cluster.get_hadoop_config("dfs.namenode.http-address") + endpoint = self.cluster.get_hadoop_config("dfs.namenode.http-address", + "0.0.0.0:50070") if endpoint.startswith("0.0.0.0"): endpoint.replace("0.0.0.0", "127.0.0.1") return HdfsClient("http://%s" % endpoint, use_kerberos=False, @@ -412,7 +453,8 @@ class Hive(Service): @property def warehouse_dir(self): if not self._warehouse_dir: - self._warehouse_dir = self.cluster.get_hadoop_config("hive.metastore.warehouse.dir") + self._warehouse_dir = urlparse( + self.cluster.get_hadoop_config("hive.metastore.warehouse.dir")).path return self._warehouse_dir def connect(self, db_name=None): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0780d2c8/tests/comparison/data_generator.py ---------------------------------------------------------------------- diff --git a/tests/comparison/data_generator.py b/tests/comparison/data_generator.py index e919f98..e1f0a01 100755 --- a/tests/comparison/data_generator.py +++ b/tests/comparison/data_generator.py @@ -49,6 +49,7 @@ from db_types import ( Timestamp, TYPES, VarChar) +from tests.comparison import db_connection LOG = getLogger(__name__) @@ -87,9 +88,10 @@ class DbPopulator(object): ''' - def __init__(self): + def __init__(self, db_engine=db_connection.IMPALA): self.cluster = None self.db_name = None + self.db_engine = db_engine self.min_col_count = None self.max_col_count = None @@ -147,9 +149,17 @@ class DbPopulator(object): cursor.execute('INSERT INTO %s SELECT * FROM %s' % (table.name, text_table.name)) cursor.drop_table(text_table.name) - with self.cluster.impala.cursor(db_name=self.db_name) as cursor: - cursor.invalidate_metadata() - cursor.compute_stats() + if self.db_engine is db_connection.IMPALA: + with self.cluster.impala.cursor(db_name=self.db_name) as cursor: + cursor.invalidate_metadata() + cursor.compute_stats() + elif self.db_engine is db_connection.HIVE: + with self.cluster.hive.cursor(db_name=self.db_name) as cursor: + cursor.invalidate_metadata() + cursor.compute_stats() + else: + raise ValueError("db_engine must be of type %s or %s", db_connection.IMPALA, + db_connection.HIVE) if postgresql_conn: with postgresql_conn.cursor() as postgresql_cursor: index_tables_in_db_if_possible(postgresql_cursor) @@ -298,7 +308,7 @@ if __name__ == '__main__': cluster = cli_options.create_cluster(args) - populator = DbPopulator() + populator = DbPopulator(db_connection.HIVE if args.use_hive else db_connection.IMPALA) if command == 'populate': populator.randomization_seed = args.randomization_seed populator.cluster = cluster @@ -308,10 +318,17 @@ if __name__ == '__main__': populator.min_row_count = args.min_row_count populator.max_row_count = args.max_row_count populator.allowed_storage_formats = args.storage_file_formats.split(',') - with cluster.impala.connect() as conn: - with conn.cursor() as cursor: - cursor.invalidate_metadata() - cursor.ensure_empty_db(args.db_name) + + if args.use_hive: + with cluster.hive.connect() as conn: + with conn.cursor() as cursor: + cursor.ensure_empty_db(args.db_name) + else: + with cluster.impala.connect() as conn: + with conn.cursor() as cursor: + cursor.invalidate_metadata() + cursor.ensure_empty_db(args.db_name) + if args.use_postgresql: with cli_options.create_connection(args) as postgresql_conn: with postgresql_conn.cursor() as cursor: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0780d2c8/tests/comparison/db_connection.py ---------------------------------------------------------------------- diff --git a/tests/comparison/db_connection.py b/tests/comparison/db_connection.py index 2fe8174..13125c2 100644 --- a/tests/comparison/db_connection.py +++ b/tests/comparison/db_connection.py @@ -789,11 +789,28 @@ class ImpalaConnection(DbConnection): auth_mechanism=('GSSAPI' if self._use_kerberos else self._NON_KERBEROS_AUTH_MECH)) +class HiveCursor(ImpalaCursor): + + def invalidate_metadata(self, table_name=None): + # There is no equivalent of "INVALIDATE METADATA" in Hive + pass + + def compute_stats(self, table_name=None): + if table_name: + self.execute("ANALYZE TABLE %s COMPUTE STATISTICS" % table_name) + self.execute("ANALYZE TABLE %s COMPUTE STATISTICS FOR COLUMNS" % table_name) + else: + for table_name in self.list_table_names(): + self.execute("ANALYZE TABLE %s COMPUTE STATISTICS" % table_name) + self.execute("ANALYZE TABLE %s COMPUTE STATISTICS FOR COLUMNS" % table_name) + + class HiveConnection(ImpalaConnection): PORT = 11050 _DB_TYPE = HIVE + _CURSOR_CLASS = HiveCursor _NON_KERBEROS_AUTH_MECH = 'PLAIN'
