Repository: incubator-hawq Updated Branches: refs/heads/master 5e5dc3d4a -> 67400868c
Revert "HAWQ-955. Add runnable scripts for feature test running in parallel." This reverts commit 5e5dc3d4a83a70df742a56aac65093c1ee6ffac9. Revert for python27 support Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/67400868 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/67400868 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/67400868 Branch: refs/heads/master Commit: 67400868cc571a820576d79d6dc9a8a3f8f31144 Parents: 5e5dc3d Author: yaoj2 <[email protected]> Authored: Mon Sep 12 18:41:26 2016 +0800 Committer: yaoj2 <[email protected]> Committed: Mon Sep 12 18:41:26 2016 +0800 ---------------------------------------------------------------------- src/test/feature/README.md | 4 +- src/test/feature/gtest-parallel | 411 --------------------- src/test/feature/parallel-run-feature-test.sh | 54 --- src/test/feature/test_main.cpp | 74 ++++ 4 files changed, 75 insertions(+), 468 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/67400868/src/test/feature/README.md ---------------------------------------------------------------------- diff --git a/src/test/feature/README.md b/src/test/feature/README.md index 404a713..229c7c4 100644 --- a/src/test/feature/README.md +++ b/src/test/feature/README.md @@ -15,7 +15,7 @@ Before building the code of feature tests part, just make sure your compiler sup 1. Make sure HAWQ is running correctly. If not, `init` or `start` HAWQ at first. 2. Load environment configuration by running `source $INSTALL_PREFIX/greenplum_path.sh`. 3. Load hdfs configuration. For example, `export HADOOP_HOME=/Users/wuhong/hadoop-2.7.2 && export PATH=${PATH}:${HADOOP_HOME}/bin`. Since some test cases need `hdfs` and `hadoop` command, just ensure these commands work before running. Otherwise you will get failure. -4. Run the cases with`./parallel-run-feature-test.sh 8 ./feature-test`(in this case 8 threads in parallel), you could use `--gtest_filter` option to filter test cases(both positive and negative patterns are supported). Please see more options by running `./feature-test --help`. +4. Run `./feature-test`, you could use `--gtest_filter` option to filter test cases(both positive and negative patterns are supported). Please see more options by running `./feature-test --help`. # Development In contribution to HAWQ, we suggest developers submitting feature tests related to your feature development. In writting a featurte test, you need to write a cpp file inside corresponding folders. There are two recommended way to write this cpp file: @@ -32,8 +32,6 @@ If some cases failed in your environment, check it out with the generated `.diff There are some cases expected to be fail in specific environment which need to be fixed later on. Don't worry about that. -To run feature tests in parallel, make sure your python version is equal to or greater than 2.7. - # Reference [HAWQ-832](https://issues.apache.org/jira/browse/HAWQ-832) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/67400868/src/test/feature/gtest-parallel ---------------------------------------------------------------------- diff --git a/src/test/feature/gtest-parallel b/src/test/feature/gtest-parallel deleted file mode 100755 index 9b1f9ee..0000000 --- a/src/test/feature/gtest-parallel +++ /dev/null @@ -1,411 +0,0 @@ -#!/usr/bin/env python2 -# Copyright 2013 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import cPickle -import errno -import gzip -import multiprocessing -import optparse -import os -import signal -import subprocess -import sys -import tempfile -import thread -import threading -import time -import zlib - -# An object that catches SIGINT sent to the Python process and notices -# if processes passed to wait() die by SIGINT (we need to look for -# both of those cases, because pressing Ctrl+C can result in either -# the main process or one of the subprocesses getting the signal). -# -# Before a SIGINT is seen, wait(p) will simply call p.wait() and -# return the result. Once a SIGINT has been seen (in the main process -# or a subprocess, including the one the current call is waiting for), -# wait(p) will call p.terminate() and raise ProcessWasInterrupted. -class SigintHandler(object): - class ProcessWasInterrupted(Exception): pass - sigint_returncodes = {-signal.SIGINT, # Unix - -1073741510, # Windows - } - def __init__(self): - self.__lock = threading.Lock() - self.__processes = set() - self.__got_sigint = False - signal.signal(signal.SIGINT, self.__sigint_handler) - def __on_sigint(self): - self.__got_sigint = True - while self.__processes: - try: - self.__processes.pop().terminate() - except OSError: - pass - def __sigint_handler(self, signal_num, frame): - with self.__lock: - self.__on_sigint() - def got_sigint(self): - with self.__lock: - return self.__got_sigint - def wait(self, p): - with self.__lock: - if self.__got_sigint: - p.terminate() - self.__processes.add(p) - code = p.wait() - with self.__lock: - self.__processes.discard(p) - if code in self.sigint_returncodes: - self.__on_sigint() - if self.__got_sigint: - raise self.ProcessWasInterrupted - return code -sigint_handler = SigintHandler() - -# Return the width of the terminal, or None if it couldn't be -# determined (e.g. because we're not being run interactively). -def term_width(out): - if not out.isatty(): - return None - try: - p = subprocess.Popen(["stty", "size"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (out, err) = p.communicate() - if p.returncode != 0 or err: - return None - return int(out.split()[1]) - except (IndexError, OSError, ValueError): - return None - -# Output transient and permanent lines of text. If several transient -# lines are written in sequence, the new will overwrite the old. We -# use this to ensure that lots of unimportant info (tests passing) -# won't drown out important info (tests failing). -class Outputter(object): - def __init__(self, out_file): - self.__out_file = out_file - self.__previous_line_was_transient = False - self.__width = term_width(out_file) # Line width, or None if not a tty. - def transient_line(self, msg): - if self.__width is None: - self.__out_file.write(msg + "\n") - else: - self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width)) - self.__previous_line_was_transient = True - def flush_transient_output(self): - if self.__previous_line_was_transient: - self.__out_file.write("\n") - self.__previous_line_was_transient = False - def permanent_line(self, msg): - self.flush_transient_output() - self.__out_file.write(msg + "\n") - -stdout_lock = threading.Lock() - -class FilterFormat: - if sys.stdout.isatty(): - # stdout needs to be unbuffered since the output is interactive. - sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) - - out = Outputter(sys.stdout) - total_tests = 0 - finished_tests = 0 - - tests = {} - outputs = {} - failures = [] - - def print_test_status(self, last_finished_test, time_ms): - self.out.transient_line("[%d/%d] %s (%d ms)" - % (self.finished_tests, self.total_tests, - last_finished_test, time_ms)) - - def handle_meta(self, job_id, args): - (command, arg) = args.split(' ', 1) - if command == "TEST": - (binary, test) = arg.split(' ', 1) - self.tests[job_id] = (binary, test.strip()) - elif command == "EXIT": - (exit_code, time_ms) = [int(x) for x in arg.split(' ', 1)] - self.finished_tests += 1 - (binary, test) = self.tests[job_id] - self.print_test_status(test, time_ms) - if exit_code != 0: - self.failures.append(self.tests[job_id]) - with open(self.outputs[job_id]) as f: - for line in f.readlines(): - self.out.permanent_line(line.rstrip()) - self.out.permanent_line( - "[%d/%d] %s returned/aborted with exit code %d (%d ms)" - % (self.finished_tests, self.total_tests, test, exit_code, time_ms)) - elif command == "TESTCNT": - self.total_tests = int(arg.split(' ', 1)[1]) - self.out.transient_line("[0/%d] Running tests..." % self.total_tests) - - def logfile(self, job_id, name): - self.outputs[job_id] = name - - def log(self, line): - stdout_lock.acquire() - (prefix, output) = line.split(' ', 1) - - assert prefix[-1] == ':' - self.handle_meta(int(prefix[:-1]), output) - stdout_lock.release() - - def end(self): - if self.failures: - self.out.permanent_line("FAILED TESTS (%d/%d):" - % (len(self.failures), self.total_tests)) - for (binary, test) in self.failures: - self.out.permanent_line(" " + binary + ": " + test) - self.out.flush_transient_output() - -class RawFormat: - def log(self, line): - stdout_lock.acquire() - sys.stdout.write(line + "\n") - sys.stdout.flush() - stdout_lock.release() - def logfile(self, job_id, name): - with open(self.outputs[job_id]) as f: - for line in f.readlines(): - self.log(str(job_id) + '> ' + line.rstrip()) - def end(self): - pass - -# Record of test runtimes. Has built-in locking. -class TestTimes(object): - def __init__(self, save_file): - "Create new object seeded with saved test times from the given file." - self.__times = {} # (test binary, test name) -> runtime in ms - - # Protects calls to record_test_time(); other calls are not - # expected to be made concurrently. - self.__lock = threading.Lock() - - try: - with gzip.GzipFile(save_file, "rb") as f: - times = cPickle.load(f) - except (EOFError, IOError, cPickle.UnpicklingError, zlib.error): - # File doesn't exist, isn't readable, is malformed---whatever. - # Just ignore it. - return - - # Discard saved times if the format isn't right. - if type(times) is not dict: - return - for ((test_binary, test_name), runtime) in times.items(): - if (type(test_binary) is not str or type(test_name) is not str - or type(runtime) not in {int, long, type(None)}): - return - - self.__times = times - - def get_test_time(self, binary, testname): - """Return the last duration for the given test as an integer number of - milliseconds, or None if the test failed or if there's no record for it.""" - return self.__times.get((binary, testname), None) - - def record_test_time(self, binary, testname, runtime_ms): - """Record that the given test ran in the specified number of - milliseconds. If the test failed, runtime_ms should be None.""" - with self.__lock: - self.__times[(binary, testname)] = runtime_ms - - def write_to_file(self, save_file): - "Write all the times to file." - try: - with open(save_file, "wb") as f: - with gzip.GzipFile("", "wb", 9, f) as gzf: - cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL) - except IOError: - pass # ignore errors---saving the times isn't that important - -# Remove additional arguments (anything after --). -additional_args = [] - -for i in range(len(sys.argv)): - if sys.argv[i] == '--': - additional_args = sys.argv[i+1:] - sys.argv = sys.argv[:i] - break - -parser = optparse.OptionParser( - usage = 'usage: %prog [options] binary [binary ...] -- [additional args]') - -parser.add_option('-d', '--output_dir', type='string', - default=os.path.join(tempfile.gettempdir(), "gtest-parallel"), - help='output directory for test logs') -parser.add_option('-r', '--repeat', type='int', default=1, - help='repeat tests') -parser.add_option('--failed', action='store_true', default=False, - help='run only failed and new tests') -parser.add_option('-w', '--workers', type='int', - default=multiprocessing.cpu_count(), - help='number of workers to spawn') -parser.add_option('--gtest_color', type='string', default='yes', - help='color output') -parser.add_option('--gtest_filter', type='string', default='', - help='test filter') -parser.add_option('--gtest_also_run_disabled_tests', action='store_true', - default=False, help='run disabled tests too') -parser.add_option('--format', type='string', default='filter', - help='output format (raw,filter)') -parser.add_option('--print_test_times', action='store_true', default=False, - help='When done, list the run time of each test') - -(options, binaries) = parser.parse_args() - -if binaries == []: - parser.print_usage() - sys.exit(1) - -logger = RawFormat() -if options.format == 'raw': - pass -elif options.format == 'filter': - logger = FilterFormat() -else: - sys.exit("Unknown output format: " + options.format) - -# Find tests. -save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times") -times = TestTimes(save_file) -tests = [] -for test_binary in binaries: - command = [test_binary] - if options.gtest_also_run_disabled_tests: - command += ['--gtest_also_run_disabled_tests'] - - list_command = list(command) - if options.gtest_filter != '': - list_command += ['--gtest_filter=' + options.gtest_filter] - - try: - test_list = subprocess.Popen(list_command + ['--gtest_list_tests'], - stdout=subprocess.PIPE).communicate()[0] - except OSError as e: - sys.exit("%s: %s" % (test_binary, str(e))) - - command += additional_args - - test_group = '' - for line in test_list.split('\n'): - if not line.strip(): - continue - if line[0] != " ": - # Remove comments for typed tests and strip whitespace. - test_group = line.split('#')[0].strip() - continue - # Remove comments for parameterized tests and strip whitespace. - line = line.split('#')[0].strip() - if not line: - continue - - test = test_group + line - if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test: - continue - tests.append((times.get_test_time(test_binary, test), - test_binary, test, command)) - -if options.failed: - # The first element of each entry is the runtime of the most recent - # run if it was successful, or None if the test is new or the most - # recent run failed. - tests = [x for x in tests if x[0] is None] - -# Sort tests by falling runtime (with None, which is what we get for -# new and failing tests, being considered larger than any real -# runtime). -tests.sort(reverse=True, key=lambda x: ((1 if x[0] is None else 0), x)) - -# Repeat tests (-r flag). -tests *= options.repeat -test_lock = threading.Lock() -job_id = 0 -logger.log(str(-1) + ': TESTCNT ' + ' ' + str(len(tests))) - -exit_code = 0 - -# Create directory for test log output. -try: - os.makedirs(options.output_dir) -except OSError as e: - # Ignore errors if this directory already exists. - if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir): - raise e -# Remove files from old test runs. -for logfile in os.listdir(options.output_dir): - os.remove(os.path.join(options.output_dir, logfile)) - -# Run the specified job. Return the elapsed time in milliseconds if -# the job succeeds, or None if the job fails. (This ensures that -# failing tests will run first the next time.) -def run_job((command, job_id, test)): - begin = time.time() - - with tempfile.NamedTemporaryFile(dir=options.output_dir, delete=False) as log: - sub = subprocess.Popen(command + ['--gtest_filter=' + test] + - ['--gtest_color=' + options.gtest_color], - stdout=log.file, - stderr=log.file) - try: - code = sigint_handler.wait(sub) - except sigint_handler.ProcessWasInterrupted: - thread.exit() - runtime_ms = int(1000 * (time.time() - begin)) - logger.logfile(job_id, log.name) - - logger.log("%s: EXIT %s %d" % (job_id, code, runtime_ms)) - if code == 0: - return runtime_ms - global exit_code - exit_code = code - return None - -def worker(): - global job_id - while True: - job = None - test_lock.acquire() - if job_id < len(tests): - (_, test_binary, test, command) = tests[job_id] - logger.log(str(job_id) + ': TEST ' + test_binary + ' ' + test) - job = (command, job_id, test) - job_id += 1 - test_lock.release() - if job is None: - return - times.record_test_time(test_binary, test, run_job(job)) - -def start_daemon(func): - t = threading.Thread(target=func) - t.daemon = True - t.start() - return t - -workers = [start_daemon(worker) for i in range(options.workers)] - -[t.join() for t in workers] -logger.end() -times.write_to_file(save_file) -if options.print_test_times: - ts = sorted((times.get_test_time(test_binary, test), test_binary, test) - for (_, test_binary, test, _) in tests - if times.get_test_time(test_binary, test) is not None) - for (time_ms, test_binary, test) in ts: - print "%8s %s" % ("%dms" % time_ms, test) -sys.exit(-signal.SIGINT if sigint_handler.got_sigint() else exit_code) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/67400868/src/test/feature/parallel-run-feature-test.sh ---------------------------------------------------------------------- diff --git a/src/test/feature/parallel-run-feature-test.sh b/src/test/feature/parallel-run-feature-test.sh deleted file mode 100755 index c099036..0000000 --- a/src/test/feature/parallel-run-feature-test.sh +++ /dev/null @@ -1,54 +0,0 @@ -#! /bin/bash - -if [ x$GPHOME == 'x' ]; then - echo "Please source greenplum_path.sh before running feature tests." - exit 0 -fi - -PSQL=${GPHOME}/bin/psql -HAWQ_DB=${PGDATABASE:-"postgres"} -HAWQ_HOST=${PGHOST:-"localhost"} -HAWQ_PORT=${PGPORT:-"5432"} -HAWQ_USER=${PGUSER:-} -HAWQ_PASSWORD=${PGPASSWORD:-} - -run_sql() { - suffix="" - if [ x$HAWQ_USER != 'x' ]; then - suffix=$suffix:" -U $HAWQ_USER" - if [ x$HAWQ_PASSWORD != 'x' ]; then - suffix=$suffix:" -W $HAWQ_PASSWORD" - fi - fi - $PSQL -d $HAWQ_DB -h $HAWQ_HOST -p $HAWQ_PORT -c "$1" $suffix > /dev/null 2>&1 - if [ $? -ne 0 ]; then - echo "$1 failed." - exit 1 - fi -} - -init_hawq_test() { - TEST_DB_NAME="hawq_feature_test_db" - - $PSQL -d $HAWQ_DB -h $HAWQ_HOST -p $HAWQ_PORT \ - -c "create database $TEST_DB_NAME;" > /dev/null 2>&1 - run_sql "alter database $TEST_DB_NAME set lc_messages to 'C';" - run_sql "alter database $TEST_DB_NAME set lc_monetary to 'C';" - run_sql "alter database $TEST_DB_NAME set lc_numeric to 'C';" - run_sql "alter database $TEST_DB_NAME set lc_time to 'C';" - run_sql "alter database $TEST_DB_NAME set timezone_abbreviations to 'Default';" - run_sql "alter database $TEST_DB_NAME set timezone to 'PST8PDT';" - run_sql "alter database $TEST_DB_NAME set datestyle to 'postgres,MDY';" - export PGDATABASE=$TEST_DB_NAME -} - -run_feature_test() { - if [ $# -lt 2 ]; then - echo "Usage: parallel-run-feature-test.sh workers [number of workers] binary [binary ...] -- [additional args]" - exit 0 - fi - init_hawq_test - $(dirname $0)/gtest-parallel --worker=$1 ${@:2} -} - -run_feature_test $1 ${@:2} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/67400868/src/test/feature/test_main.cpp ---------------------------------------------------------------------- diff --git a/src/test/feature/test_main.cpp b/src/test/feature/test_main.cpp old mode 100755 new mode 100644 index 443e2db..41142ec --- a/src/test/feature/test_main.cpp +++ b/src/test/feature/test_main.cpp @@ -1,6 +1,80 @@ +#include <sys/types.h> +#include <pwd.h> #include "gtest/gtest.h" +#include "psql.h" +#include "sql_util.h" + +using std::string; + +class TestPrepare +{ + private: + const string testDbName = "hawq_feature_test"; + std::unique_ptr<hawq::test::PSQL> conn; + void init_hawq_test(); + public: + TestPrepare(); + ~TestPrepare(); +}; + +#define PSQL_RUN_AND_ASSERT() \ + conn->runSQLCommand(cmd); \ + ASSERT_EQ(0, conn->getLastStatus()) << conn->getLastResult(); + +void TestPrepare::init_hawq_test() +{ + string user = HAWQ_USER; + if(user.empty()) { + struct passwd *pw; + uid_t uid = geteuid(); + pw = getpwuid(uid); + user.assign(pw->pw_name); + } + + conn.reset(new hawq::test::PSQL(HAWQ_DB, HAWQ_HOST, HAWQ_PORT, user, HAWQ_PASSWORD)); + + // Create the test db and set some default guc values so that test outputs + // could be consistent. We do not drop the database in advance since keeping the + // previous environment could probably help reproducing and resolving some failing + // test issues, so you need to drop the database yourself when necessary, before + // running the tests. + string cmd; + cmd = "create database " + testDbName; + // Do not check return value since probably the db has existed. + conn->runSQLCommand(cmd); + cmd = "alter database " + testDbName + " set lc_messages to 'C'"; + PSQL_RUN_AND_ASSERT(); + cmd = "alter database " + testDbName + " set lc_monetary to 'C'"; + PSQL_RUN_AND_ASSERT(); + cmd = "alter database " + testDbName + " set lc_numeric to 'C'"; + PSQL_RUN_AND_ASSERT(); + cmd = "alter database " + testDbName + " set lc_time to 'C'"; + PSQL_RUN_AND_ASSERT(); + cmd = "alter database " + testDbName + " set timezone_abbreviations to 'Default'"; + PSQL_RUN_AND_ASSERT(); + cmd = "alter database " + testDbName + " set timezone to 'PST8PDT'"; + PSQL_RUN_AND_ASSERT(); + cmd = "alter database " + testDbName + " set datestyle to 'postgres,MDY'"; + PSQL_RUN_AND_ASSERT(); +} + +TestPrepare::TestPrepare() +{ + init_hawq_test(); + + // The test will use the newly created database. + setenv("PGDATABASE", testDbName.c_str(), 1); +} + +TestPrepare::~TestPrepare() +{ +} int main(int argc, char** argv) { + + TestPrepare tp; + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); }
