Repository: impala Updated Branches: refs/heads/master 25422c74b -> 2e6a63e31
IMPALA-6070: Further improvements to test-with-docker. This commit tackles a few additions and improvements to test-with-docker. In general, I'm adding workloads (e.g., exhaustive, rat-check), tuning memory setting and parallelism, and trying to speed things up. Bug fixes: * Embarassingly, I was still skipping thrift-server-test in the backend tests. This was a mistake in handling feedback from my last review. * I made the timeline a little bit taller to clip less. Adding workloads: * I added the RAT licensing check. * I added exhaustive runs. This led me to model the suites a little bit more in Python, with a class representing a suite with a bunch of data about the suite. It's not perfect and still coupled with the entrypoint.sh shell script, but it feels workable. As part of adding exhaustive tests, I had to re-work the timeout handling, since now different suites meaningfully have different timeouts. Speed ups: * To speed up test runs, I added a mechanism to split py.test suites into multiple shards with a py.test argument. This involved a little bit of work in conftest.py, and exposing $RUN_CUSTOM_CLUSTER_TESTS_ARGS in run-all-tests.sh. Furthermore, I moved a bit more logic about managing the list of suites into Python. * Doing the full build with "-notests" and only building the backend tests in the relevant target that needs them. This speeds up "docker commit" significantly by removing about 20GB from the container. I had to indicates that expr-codegen-test depends on expr-codegen-test-ir, which was missing. * I sped up copying the Kudu data: previously I did both a move and a copy; now I'm doing a move followed by a move. One of the moves is cross-filesystem so is slow, but this does half the amount of copying. Memory usage: * I tweaked the memlimit_gb settings to have a higher default. I've been fighting empirically to have the tests run well on c4.8xlarge and m4.10xlarge. The more memory a minicluster and test suite run uses, the fewer parallel suites we can run. By observing the peak processes at the tail of a run (with a new "memory_usage" function that uses a ps/sort/awk trick) and by observing peak container total_rss, I found that we had several JVMs that didn't have Xmx settings set. I added Xms/Xmx settings in a few places: * The non-first Impalad does very little JVM work, so having an Xmx keeps it small, even in the parallel tests. * Datanodes do work, but they essentially were never garbage collecting, because JVM defaults let them use up to 1/4th the machine memory. (I observed this based on RSS at the end of the run; nothing fancier.) Adding Xms/Xmx settings helped. * Similarly, I piped the settings through to HBase. A few daemons still run without resource limitations, but they don't seem to be a problem. Change-Id: I43fe124f00340afa21ad1eeb6432d6d50151ca7c Reviewed-on: http://gerrit.cloudera.org:8080/10123 Reviewed-by: Joe McDonnell <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2e6a63e3 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2e6a63e3 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2e6a63e3 Branch: refs/heads/master Commit: 2e6a63e31e22657173ad2f03a8185f8a4a8f074e Parents: 25422c7 Author: Philip Zeyliger <[email protected]> Authored: Fri Apr 6 10:16:39 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Apr 26 20:47:29 2018 +0000 ---------------------------------------------------------------------- be/src/exprs/CMakeLists.txt | 1 + bin/run-all-tests.sh | 5 +- docker/entrypoint.sh | 173 +++++++---- docker/monitor.py | 29 +- docker/test-with-docker.py | 296 ++++++++++++++----- docker/timeline.html.template | 9 +- testdata/bin/run-hbase.sh | 1 + .../common/etc/init.d/hdfs-common | 7 + tests/conftest.py | 35 +++ tests/run-tests.py | 19 +- 10 files changed, 426 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/2e6a63e3/be/src/exprs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index cff391c..755c166 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -72,5 +72,6 @@ ADD_BE_TEST(expr-codegen-test) # expr-codegen-test includes test IR functions COMPILE_TO_IR(expr-codegen-test.cc) add_dependencies(expr-codegen-test-ir gen-deps) +add_dependencies(expr-codegen-test expr-codegen-test-ir) ADD_UDF_TEST(aggregate-functions-test) http://git-wip-us.apache.org/repos/asf/impala/blob/2e6a63e3/bin/run-all-tests.sh ---------------------------------------------------------------------- diff --git a/bin/run-all-tests.sh b/bin/run-all-tests.sh index 4743a4f..7702134 100755 --- a/bin/run-all-tests.sh +++ b/bin/run-all-tests.sh @@ -57,6 +57,8 @@ fi : ${TEST_START_CLUSTER_ARGS:=} # Extra args to pass to run-tests.py : ${RUN_TESTS_ARGS:=} +# Extra args to pass to run-custom-cluster-tests.sh +: ${RUN_CUSTOM_CLUSTER_TESTS_ARGS:=} if [[ "${TARGET_FILESYSTEM}" == "local" ]]; then # TODO: Remove abort_on_config_error flag from here and create-load-data.sh once # checkConfiguration() accepts the local filesystem (see IMPALA-1850). @@ -223,7 +225,8 @@ do # Run the custom-cluster tests after all other tests, since they will restart the # cluster repeatedly and lose state. # TODO: Consider moving in to run-tests.py. - if ! "${IMPALA_HOME}/tests/run-custom-cluster-tests.sh" ${COMMON_PYTEST_ARGS}; then + if ! "${IMPALA_HOME}/tests/run-custom-cluster-tests.sh" ${COMMON_PYTEST_ARGS} \ + ${RUN_CUSTOM_CLUSTER_TESTS_ARGS}; then TEST_RET_CODE=1 fi export IMPALA_MAX_LOG_FILES="${IMPALA_MAX_LOG_FILES_SAVE}" http://git-wip-us.apache.org/repos/asf/impala/blob/2e6a63e3/docker/entrypoint.sh ---------------------------------------------------------------------- diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index d371d25..00b2ee6 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -90,12 +90,6 @@ function impala_environment() { function boot_container() { pushd /home/impdev/Impala - # Required for metastore - sudo service postgresql start - - # Required for starting HBase - sudo service ssh start - # Make log directories. This is typically done in buildall.sh. mkdir -p logs/be_tests logs/fe_tests/coverage logs/ee_tests logs/custom_cluster_tests @@ -112,17 +106,37 @@ function boot_container() { echo Hosts file: cat /etc/hosts - # Make a copy of Kudu's WALs to avoid isue with Docker filesystems (aufs and + popd +} + +function start_minicluster { + # The subshell here avoids the verbose output from set -x. + (echo ">>> Starting PostgreSQL and SSH") 2> /dev/null + pushd /home/impdev/Impala + + # Required for metastore + sudo service postgresql start + + # Required for starting HBase + sudo service ssh start + + (echo ">>> Copying Kudu Data") 2> /dev/null + # Move around Kudu's WALs to avoid issue with Docker filesystems (aufs and # overlayfs) that don't support os.rename(2) on directories, which Kudu # requires. We make a fresh copy of the data, in which case rename(2) works # presumably because there's only one layer involved. See # https://issues.apache.org/jira/browse/KUDU-1419. - cd /home/impdev/Impala/testdata + set -x + pushd /home/impdev/Impala/testdata for x in cluster/cdh*/node-*/var/lib/kudu/*/wal; do + echo $x + # This mv takes time, as it's actually copying into the latest layer. mv $x $x-orig - cp -r $x-orig $x - rm -r $x-orig + mkdir $x + mv $x-orig/* $x + rmdir $x-orig done + popd # Wait for postgresql to really start; if it doesn't, Hive Metastore will fail to start. for i in {1..120}; do @@ -135,6 +149,9 @@ function boot_container() { done sudo -u postgres psql -c "select 1" + (echo ">>> Starting mini cluster") 2> /dev/null + testdata/bin/run-all.sh + popd } @@ -164,8 +181,13 @@ function build_impdev() { # Builds Impala and loads test data. # Note that IMPALA-6494 prevents us from using shared library builds, - # which are smaller and thereby speed things up. - ./buildall.sh -noclean -format -testdata -skiptests + # which are smaller and thereby speed things up. We use "-notests" + # to avoid building backend tests, which are sizable, and + # can be built when executing those tests. + ./buildall.sh -noclean -format -testdata -notests + + # Dump current memory usage to logs, before shutting things down. + memory_usage # Shut down things cleanly. testdata/bin/kill-all.sh @@ -176,9 +198,34 @@ function build_impdev() { # Clean up things we don't need to reduce image size find be -name '*.o' -execdir rm '{}' + # ~1.6GB + # Clean up dangling symlinks. These (typically "cluster/cdh*-node-*") + # may point to something inside a container that no longer exists + # and can confuse Jenkins. + find /logs -xtype l -execdir rm '{}' ';' + popd } +# Prints top 20 RSS consumers (and other, total), in megabytes Common culprits +# are Java processes without Xmx set. Since most things don't reclaim memory, +# this is a decent proxy for peak memory usage by long-lived processes. +function memory_usage() { + ( + echo "Top 20 memory consumers (RSS in MBs)" + sudo ps -axho rss,args | \ + sed -e 's/^ *//' | \ + sed -e 's, ,\t,' | \ + sort -nr | \ + awk -F'\t' ' + FNR < 20 { print $1/1024.0, $2; total += $1/1024.0 } + FNR >= 20 { other+= $1/1024.0; total += $1/1024.0 } + END { + if (other) { print other, "-- other --" }; + print total, "-- total --" + }' + ) >& /logs/memory_usage.txt +} + # Runs a suite passed in as the first argument. Tightly # coupled with Impala's run-all-tests and the suite names. # from test-with-docker.py. @@ -189,6 +236,8 @@ function test_suite() { # These test suites are for testing. if [[ $1 == NOOP ]]; then + # Sleep busily for 10 seconds. + bash -c 'while [[ $SECONDS -lt 10 ]]; do :; done' return 0 fi if [[ $1 == NOOP_FAIL ]]; then @@ -208,31 +257,9 @@ function test_suite() { boot_container impala_environment - # By default, the JVM will use 1/4 of your OS memory for its heap size. For a - # long-running test, this will delay GC inside of impalad's leading to - # unnecessarily large process RSS footprints. We cap the heap size at - # a more reasonable size. Note that "test_insert_large_string" fails - # at 2g and 3g, so the suite that includes it (EE_TEST_PARALLEL) gets - # additional memory. - # - # Similarly, bin/start-impala-cluster typically configures the memlimit - # to be 80% of the machine memory, divided by the number of daemons. - # If multiple containers are to be run simultaneously, this is scaled - # down in test-with-docker.py (and further configurable with --impalad-mem-limit-bytes) - # and passed in via $IMPALAD_MEM_LIMIT_BYTES to the container. There is a - # relationship between the number of parallel tests that can be run by py.test and this - # limit. - JVM_HEAP_GB=2 - if [[ $1 = EE_TEST_PARALLEL ]]; then - JVM_HEAP_GB=4 - fi - export TEST_START_CLUSTER_ARGS="--jvm_args=-Xmx${JVM_HEAP_GB}g \ - --impalad_args=--mem_limit=$IMPALAD_MEM_LIMIT_BYTES" - # BE tests don't require the minicluster, so we can run them directly. if [[ $1 = BE_TEST ]]; then - # IMPALA-6494: thrift-server-test fails in Ubuntu16.04 for the moment; skip it. - export SKIP_BE_TEST_PATTERN='thrift-server-test*' + make -j$(nproc) --load-average=$(nproc) be-test be-benchmarks if ! bin/run-backend-tests.sh; then echo "Tests $1 failed!" return 1 @@ -242,37 +269,73 @@ function test_suite() { fi fi + if [[ $1 == RAT_CHECK ]]; then + # Runs Apache RAT (a license checker) + git archive --prefix=rat/ -o rat-impala.zip HEAD + wget --quiet https://archive.apache.org/dist/creadur/apache-rat-0.12/apache-rat-0.12-bin.tar.gz + tar xzf apache-rat-0.12-bin.tar.gz + java -jar apache-rat-0.12/apache-rat-0.12.jar -x rat-impala.zip > logs/rat.xml + bin/check-rat-report.py bin/rat_exclude_files.txt logs/rat.xml + return $? + fi + # Start the minicluster - testdata/bin/run-all.sh + start_minicluster - export MAX_PYTEST_FAILURES=0 - # Choose which suite to run; this is how run-all.sh chooses between them. - export FE_TEST=false - export BE_TEST=false - export EE_TEST=false - export JDBC_TEST=false - export CLUSTER_TEST=false - - eval "export ${1}=true" - - if [[ ${1} = "EE_TEST_SERIAL" ]]; then - # We bucket the stress tests with the parallel tests. - export RUN_TESTS_ARGS="--skip-parallel --skip-stress" - export EE_TEST=true - elif [[ ${1} = "EE_TEST_PARALLEL" ]]; then - export RUN_TESTS_ARGS="--skip-serial" - export EE_TEST=true + # By default, the JVM will use 1/4 of your OS memory for its heap size. For a + # long-running test, this will delay GC inside of impalad's leading to + # unnecessarily large process RSS footprints. To combat this, we + # set a small initial heap size, and then cap it at a more reasonable + # size. The small initial heap sizes help for daemons that do little + # in the way of JVM work (e.g., the 2nd and 3rd impalad's). + # Note that "test_insert_large_string" fails at 2g and 3g, so the suite that + # includes it (EE_TEST_PARALLEL) gets additional memory. + + # Note that we avoid using TEST_START_CLUSTER_ARGS="--jvm-args=..." + # because it gets flattened along the way if we need to provide + # more than one Java argument. We use JAVA_TOOL_OPTIONS instead. + JVM_HEAP_MAX_GB=2 + if [[ $1 = EE_TEST_PARALLEL ]]; then + JVM_HEAP_MAX_GB=4 + elif [[ $1 = EE_TEST_PARALLEL_EXHAUSTIVE ]]; then + JVM_HEAP_MAX_GB=8 fi + JAVA_TOOL_OPTIONS="-Xms512M -Xmx${JVM_HEAP_MAX_GB}G" + + # Similarly, bin/start-impala-cluster typically configures the memlimit + # to be 80% of the machine memory, divided by the number of daemons. + # If multiple containers are to be run simultaneously, this is scaled + # down in test-with-docker.py (and further configurable with --impalad-mem-limit-bytes) + # and passed in via $IMPALAD_MEM_LIMIT_BYTES to the container. There is a + # relationship between the number of parallel tests that can be run by py.test and this + # limit. + export TEST_START_CLUSTER_ARGS="--impalad_args=--mem_limit=$IMPALAD_MEM_LIMIT_BYTES" + + export MAX_PYTEST_FAILURES=0 + + # Asserting that these should are all set (to either true or false as strings). + # This is how run-all.sh chooses between them. + [[ $FE_TEST && $BE_TEST && $EE_TEST && $JDBC_TEST && $CLUSTER_TEST ]] ret=0 + if [[ ${EE_TEST} = true ]]; then + # test_insert_parquet.py depends on this binary + make -j$(nproc) --load-average=$(nproc) parquet-reader + fi + # Run tests. - if ! time -p bin/run-all-tests.sh; then + (echo ">>> $1: Starting run-all-test") 2> /dev/null + if ! time -p bash -x bin/run-all-tests.sh; then ret=1 echo "Tests $1 failed!" else echo "Tests $1 succeeded!" fi + + # Save memory usage after tests have run but before shutting down the cluster. + memory_usage || true + # Oddly, I've observed bash fail to exit (and wind down the container), # leading to test-with-docker.py hitting a timeout. Killing the minicluster # daemons fixes this. @@ -312,6 +375,8 @@ function main() { shift echo ">>> ${CMD} $@ (begin)" + # Dump environment, for debugging + env | grep -vE "AWS_(SECRET_)?ACCESS_KEY" set -x if "${CMD}" "$@"; then ret=0 http://git-wip-us.apache.org/repos/asf/impala/blob/2e6a63e3/docker/monitor.py ---------------------------------------------------------------------- diff --git a/docker/monitor.py b/docker/monitor.py index 64cde0c..58e2a83 100644 --- a/docker/monitor.py +++ b/docker/monitor.py @@ -232,28 +232,37 @@ class Timeline(object): if self.interesting_re.search(line)] return [(container.name,) + split_timestamp(line) for line in interesting_lines] - @staticmethod - def parse_metrics(f): + def parse_metrics(self, f): """Parses timestamped metric lines. Given metrics lines like: 2017-10-25 10:08:30.961510 87d5562a5fe0ea075ebb2efb0300d10d23bfa474645bb464d222976ed872df2a cpu user 33 system 15 - Returns an iterable of (ts, container, user_cpu, system_cpu) + Returns an iterable of (ts, container, user_cpu, system_cpu). It also updates + container.peak_total_rss and container.total_user_cpu and container.total_system_cpu. """ prev_by_container = {} + peak_rss_by_container = {} for line in f: ts, rest = split_timestamp(line.rstrip()) + total_rss = None try: container, metric_type, rest2 = rest.split(" ", 2) - if metric_type != "cpu": - continue - _, user_cpu_s, _, system_cpu_s = rest2.split(" ", 3) + if metric_type == "cpu": + _, user_cpu_s, _, system_cpu_s = rest2.split(" ", 3) + elif metric_type == "memory": + memory_metrics = rest2.split(" ") + total_rss = int(memory_metrics[memory_metrics.index("total_rss") + 1 ]) except: logging.warning("Skipping metric line: %s", line) continue + if total_rss is not None: + peak_rss_by_container[container] = max(peak_rss_by_container.get(container, 0), + total_rss) + continue + prev_ts, prev_user, prev_system = prev_by_container.get( container, (None, None, None)) user_cpu = int(user_cpu_s) @@ -267,6 +276,14 @@ class Timeline(object): (system_cpu - prev_system)/dt/USER_HZ prev_by_container[container] = ts, user_cpu, system_cpu + # Now update container totals + for c in self.containers: + if c.id in prev_by_container: + _, u, s = prev_by_container[c.id] + c.total_user_cpu, c.total_system_cpu = u / USER_HZ, s / USER_HZ + if c.id in peak_rss_by_container: + c.peak_total_rss = peak_rss_by_container[c.id] + def create(self, output): # Read logfiles timelines = [] http://git-wip-us.apache.org/repos/asf/impala/blob/2e6a63e3/docker/test-with-docker.py ---------------------------------------------------------------------- diff --git a/docker/test-with-docker.py b/docker/test-with-docker.py index 59640b4..c3f4427 100755 --- a/docker/test-with-docker.py +++ b/docker/test-with-docker.py @@ -95,14 +95,10 @@ as part of the build, in logs/docker/*/timeline.html. # Suggested speed improvement TODOs: # - Speed up testdata generation # - Skip generating test data for variants not being run -# - Make container image smaller; perhaps make BE test binaries -# smaller -# - Split up cluster tests into two groups +# - Make container image smaller # - Analyze .xml junit files to find slow tests; eradicate # or move to different suite. -# - Avoid building BE tests, and build them during execution, -# saving on container space as well as baseline build -# time. +# - Run BE tests earlier (during data load) # We do not use Impala's python environment here, nor do we depend on # non-standard python libraries to avoid needing extra build steps before @@ -125,11 +121,11 @@ if __name__ == '__main__' and __package__ is None: base = os.path.dirname(os.path.abspath(__file__)) +LOG_FORMAT="%(asctime)s %(threadName)s: %(message)s" -def main(): - logging.basicConfig(level=logging.INFO, - format='%(asctime)s %(threadName)s: %(message)s') +def main(): + logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) default_parallel_test_concurrency, default_suite_concurrency, default_memlimit_gb = \ _compute_defaults() @@ -160,20 +156,25 @@ def main(): parser.add_argument( '--build-image', metavar='IMAGE', help='Skip building, and run tests on pre-existing image.') - parser.add_argument( + + suite_group = parser.add_mutually_exclusive_group() + suite_group.add_argument( '--suite', metavar='VARIANT', action='append', - help="Run specific test suites; can be specified multiple times. \ - If not specified, all tests are run. Choices: " + ",".join(ALL_SUITES)) + help=""" + Run specific test suites; can be specified multiple times. + Test-with-docker may shard some suites to improve parallelism. + If not specified, default tests are run. + Default: %s, All Choices: %s + """ % (",".join([ s.name for s in DEFAULT_SUITES]), + ",".join([ s.name for s in ALL_SUITES ]))) + suite_group.add_argument('--all-suites', action='store_true', default=False, + help="If set, run all available suites.") parser.add_argument( '--name', metavar='NAME', help="Use a specific name for the test run. The name is used " + "as a prefix for the container and image names, and " + "as part of the log directory naming. Defaults to include a timestamp.", default=datetime.datetime.now().strftime("i-%Y%m%d-%H%M%S")) - parser.add_argument('--timeout', metavar='MINUTES', - help="Timeout for test suites, in minutes.", - type=int, - default=60*2) parser.add_argument('--ccache-dir', metavar='DIR', help="CCache directory to use", default=os.path.expanduser("~/.ccache")) @@ -181,22 +182,28 @@ def main(): args = parser.parse_args() if not args.suite: - args.suite = ALL_SUITES + if args.all_suites: + # Ignore "NOOP" tasks, as they are just for testing. + args.suite = [ s.name for s in ALL_SUITES if not s.name.startswith("NOOP") ] + else: + args.suite = [ s.name for s in DEFAULT_SUITES ] t = TestWithDocker( - build_image=args.build_image, suites=args.suite, - name=args.name, timeout=args.timeout, cleanup_containers=args.cleanup_containers, + build_image=args.build_image, suite_names=args.suite, + name=args.name, cleanup_containers=args.cleanup_containers, cleanup_image=args.cleanup_image, ccache_dir=args.ccache_dir, test_mode=args.test, parallel_test_concurrency=args.parallel_test_concurrency, suite_concurrency=args.suite_concurrency, impalad_mem_limit_bytes=args.impalad_mem_limit_bytes) - logging.getLogger('').addHandler( - logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt"))) + fh = logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt")) + fh.setFormatter(logging.Formatter(LOG_FORMAT)) + logging.getLogger('').addHandler(fh) logging.info("Arguments: %s", args) ret = t.run() t.create_timeline() + t.log_summary() if not ret: sys.exit(1) @@ -229,10 +236,11 @@ def _compute_defaults(): logging.info("CPUs: %s Memory (GB): %s", cpus, total_memory_gb) parallel_test_concurrency = min(cpus, 8) - memlimit_gb = 7 + memlimit_gb = 8 if total_memory_gb >= 95: suite_concurrency = 4 + memlimit_gb = 11 parallel_test_concurrency = min(cpus, 12) elif total_memory_gb >= 65: suite_concurrency = 3 @@ -244,19 +252,100 @@ def _compute_defaults(): return parallel_test_concurrency, suite_concurrency, memlimit_gb * 1024 * 1024 * 1024 +class Suite(object): + """Encapsulates a test suite. + + A test suite is a named thing that the user can select to run, + and it runs in its own container, in parallel with other suites. + The actual running happens from entrypoint.sh and is controlled + mostly by environment variables. When complexity is easier + to handle in Python (with its richer data types), we prefer + it here. + """ + def __init__(self, name, **envs): + """Create suite with given name and environment.""" + self.name = name + self.envs = dict( + FE_TEST="false", + BE_TEST="false", + EE_TEST="false", + JDBC_TEST="false", + CLUSTER_TEST="false") + # If set, this suite is sharded past a certain suite concurrency threshold. + self.shard_at_concurrency = None + # Variable to which to append --shard_tests + self.sharding_variable = None + self.envs[name] = "true" + self.envs.update(envs) + self.timeout_minutes = 120 + + def copy(self, name, **envs): + """Duplicates current suite allowing for environment updates.""" + v = dict() + v.update(self.envs) + v.update(envs) + ret = Suite(name, **v) + ret.shard_at_concurrency = self.shard_at_concurrency + ret.sharding_variable = self.sharding_variable + ret.timeout_minutes = self.timeout_minutes + return ret + + def exhaustive(self): + """Returns an "exhaustive" copy of the suite.""" + r = self.copy(self.name + "_EXHAUSTIVE", EXPLORATION_STRATEGY="exhaustive") + r.timeout_minutes = 240 + return r + + def sharded(self, shards): + """Returns a list of sharded copies of the list. + + key is the name of the variable which needs to be appended with "--shard-tests=..." + """ + # RUN_TESTS_ARGS + ret = [] + for i in range(1, shards + 1): + s = self.copy("%s_%d_of_%d" % (self.name, i, shards)) + s.envs[self.sharding_variable] = self.envs.get(self.sharding_variable, "") \ + + " --shard_tests=%s/%s" % (i, shards) + ret.append(s) + return ret -# The names of all the test tracks supported. NOOP isn't included here, but is -# handy for testing. These are organized slowest-to-fastest, so that, when -# parallelism of suites is limited, the total time is not impacted. -ALL_SUITES = [ - "EE_TEST_SERIAL", - "EE_TEST_PARALLEL", - "CLUSTER_TEST", - "BE_TEST", - "FE_TEST", - "JDBC_TEST", +# Definitions of all known suites: +ee_test_serial = Suite("EE_TEST_SERIAL", EE_TEST="true", + RUN_TESTS_ARGS="--skip-parallel --skip-stress") +ee_test_serial.shard_at_concurrency = 4 +ee_test_serial.sharding_variable = "RUN_TESTS_ARGS" +ee_test_serial_exhaustive = ee_test_serial.exhaustive() +ee_test_parallel = Suite("EE_TEST_PARALLEL", EE_TEST="true", + RUN_TESTS_ARGS="--skip-serial") +ee_test_parallel_exhaustive = ee_test_parallel.exhaustive() +cluster_test = Suite("CLUSTER_TEST") +cluster_test.shard_at_concurrency = 4 +cluster_test.sharding_variable = "RUN_CUSTOM_CLUSTER_TESTS_ARGS" +cluster_test_exhaustive = cluster_test.exhaustive() + +# Default supported suites. These are organized slowest-to-fastest, so that, +# when parallelism is limited, the total time is least impacted. +DEFAULT_SUITES = [ + ee_test_serial, + ee_test_parallel, + cluster_test, + Suite("BE_TEST"), + Suite("FE_TEST"), + Suite("JDBC_TEST") ] +OTHER_SUITES = [ + ee_test_parallel_exhaustive, + ee_test_serial_exhaustive, + cluster_test_exhaustive, + Suite("RAT_CHECK"), + # These are used for testing this script + Suite("NOOP"), + Suite("NOOP_FAIL"), + Suite("NOOP_SLEEP_FOREVER") +] +ALL_SUITES = DEFAULT_SUITES + OTHER_SUITES def _call(args, check=True): """Wrapper for calling a subprocess. @@ -297,6 +386,12 @@ class Container(object): self.running = running self.start = None self.end = None + self.removed = False + + # Updated by Timeline class + self.total_user_cpu = -1 + self.total_system_cpu = -1 + self.peak_total_rss = -1 def runtime_seconds(self): if self.start and self.end: @@ -311,15 +406,13 @@ class Container(object): class TestWithDocker(object): """Tests Impala using Docker containers for parallelism.""" - def __init__(self, build_image, suites, name, timeout, cleanup_containers, + def __init__(self, build_image, suite_names, name, cleanup_containers, cleanup_image, ccache_dir, test_mode, suite_concurrency, parallel_test_concurrency, impalad_mem_limit_bytes): self.build_image = build_image - self.suites = [TestSuiteRunner(self, suite) for suite in suites] self.name = name self.containers = [] - self.timeout_minutes = timeout self.git_root = _check_output(["git", "rev-parse", "--show-toplevel"]).strip() self.cleanup_containers = cleanup_containers self.cleanup_image = cleanup_image @@ -336,6 +429,26 @@ class TestWithDocker(object): self.parallel_test_concurrency = parallel_test_concurrency self.impalad_mem_limit_bytes = impalad_mem_limit_bytes + # Map suites back into objects; we ignore case for this mapping. + suites = [] + suites_by_name = {} + for suite in ALL_SUITES: + suites_by_name[suite.name.lower()] = suite + for suite_name in suite_names: + suites.append(suites_by_name[suite_name.lower()]) + + # If we have enough concurrency, shard some suites into two halves. + suites2 = [] + for suite in suites: + if suite.shard_at_concurrency is not None and \ + suite_concurrency >= suite.shard_at_concurrency: + suites2.extend(suite.sharded(2)) + else: + suites2.append(suite) + suites = suites2 + + self.suite_runners = [TestSuiteRunner(self, suite) for suite in suites] + def _create_container(self, image, name, logdir, logname, entrypoint, extras=None): """Returns a new container. @@ -374,8 +487,10 @@ class TestWithDocker(object): + extras + [image] + entrypoint).strip() - return Container(name=name, id_=container_id, + ctr = Container(name=name, id_=container_id, logfile=os.path.join(self.log_dir, logdir, logname)) + logging.info("Created container %s", ctr) + return ctr def _run_container(self, container): """Runs container, and returns True if the container had a successful exit value. @@ -413,15 +528,17 @@ class TestWithDocker(object): @staticmethod def _stop_container(container): """Stops container. Ignores errors (e.g., if it's already exited).""" - _call(["docker", "stop", container.id], check=False) if container.running: + _call(["docker", "stop", container.id], check=False) container.end = time.time() container.running = False @staticmethod def _rm_container(container): """Removes container.""" - _call(["docker", "rm", container.id], check=False) + if not container.removed: + _call(["docker", "rm", container.id], check=False) + container.removed = True def _create_build_image(self): """Creates the "build image", with Impala compiled and data loaded.""" @@ -451,36 +568,36 @@ class TestWithDocker(object): self._rm_container(container) def _run_tests(self): - start_time = time.time() - timeout_seconds = self.timeout_minutes * 60 - deadline = start_time + timeout_seconds pool = multiprocessing.pool.ThreadPool(processes=self.suite_concurrency) outstanding_suites = [] - for suite in self.suites: + for suite in self.suite_runners: suite.task = pool.apply_async(suite.run) outstanding_suites.append(suite) ret = True - while time.time() < deadline and len(outstanding_suites) > 0: - for suite in list(outstanding_suites): - task = suite.task - if task.ready(): - this_task_ret = task.get() - outstanding_suites.remove(suite) - if this_task_ret: - logging.info("Suite %s succeeded.", suite.name) - else: - logging.info("Suite %s failed.", suite.name) - ret = False - time.sleep(10) - if len(outstanding_suites) > 0: - for container in self.containers: - self._stop_container(container) - for suite in outstanding_suites: - suite.task.get() - raise Exception("Tasks not finished within timeout (%s minutes): %s" % - (self.timeout_minutes, ",".join([ - suite.name for suite in outstanding_suites]))) + try: + while len(outstanding_suites) > 0: + for suite in list(outstanding_suites): + if suite.timed_out(): + msg = "Task %s not finished within timeout %s" % (suite.name, + suite.suite.timeout_minutes,) + logging.error(msg) + raise Exception(msg) + task = suite.task + if task.ready(): + this_task_ret = task.get() + outstanding_suites.remove(suite) + if this_task_ret: + logging.info("Suite %s succeeded.", suite.name) + else: + logging.info("Suite %s failed.", suite.name) + ret = False + time.sleep(5) + except KeyboardInterrupt: + logging.info("\n\nDetected KeyboardInterrupt; shutting down!\n\n") + raise + finally: + pool.terminate() return ret def run(self): @@ -495,17 +612,13 @@ class TestWithDocker(object): else: self.image = self.build_image ret = self._run_tests() - logging.info("Containers:") - for c in self.containers: - def to_success_string(exitcode): - if exitcode == 0: - return "SUCCESS" - return "FAILURE" - logging.info("%s %s %s %s", to_success_string(c.exitcode), c.name, c.logfile, - c.runtime_seconds()) return ret finally: self.monitor.stop() + if self.cleanup_containers: + for c in self.containers: + self._stop_container(c) + self._rm_container(c) if self.cleanup_image and self.image: _call(["docker", "rmi", self.image], check=False) logging.info("Memory usage: %s GB min, %s GB max", @@ -526,6 +639,23 @@ class TestWithDocker(object): interesting_re=self._INTERESTING_RE) timeline.create(os.path.join(self.log_dir, "timeline.html")) + def log_summary(self): + logging.info("Containers:") + def to_success_string(exitcode): + if exitcode == 0: + return "SUCCESS" + return "FAILURE" + + for c in self.containers: + logging.info("%s %s %s %0.1fm wall, %0.1fm user, %0.1fm system, " + + "%0.1fx parallelism, %0.1f GB peak RSS", + to_success_string(c.exitcode), c.name, c.logfile, + c.runtime_seconds() / 60.0, + c.total_user_cpu / 60.0, + c.total_system_cpu / 60.0, + (c.total_user_cpu + c.total_system_cpu) / max(c.runtime_seconds(), 0.0001), + c.peak_total_rss / 1024.0 / 1024.0 / 1024.0) + class TestSuiteRunner(object): """Runs a single test suite.""" @@ -534,12 +664,24 @@ class TestSuiteRunner(object): self.test_with_docker = test_with_docker self.suite = suite self.task = None - self.name = self.suite.lower() + self.name = suite.name.lower() + # Set at the beginning of run and facilitates enforcing timeouts + # for individual suites. + self.deadline = None + + def timed_out(self): + return self.deadline is not None and time.time() > self.deadline def run(self): """Runs given test. Returns true on success, based on exit code.""" + self.deadline = time.time() + self.suite.timeout_minutes * 60 test_with_docker = self.test_with_docker suite = self.suite + envs = ["-e", "NUM_CONCURRENT_TESTS=" + str(test_with_docker.parallel_test_concurrency)] + for k, v in sorted(suite.envs.iteritems()): + envs.append("-e") + envs.append("%s=%s" % (k, v)) + self.start = time.time() # io-file-mgr-test expects a real-ish file system at /tmp; @@ -554,13 +696,11 @@ class TestSuiteRunner(object): name=container_name, extras=[ "-v", tmpdir + ":/tmp", - "-u", str(os.getuid()), - "-e", "NUM_CONCURRENT_TESTS=" + - str(test_with_docker.parallel_test_concurrency), - ], + "-u", str(os.getuid()) + ] + envs, logdir=self.name, - logname="log-test-" + self.suite + ".txt", - entrypoint=["/mnt/base/entrypoint.sh", "test_suite", suite]) + logname="log-test-" + self.suite.name + ".txt", + entrypoint=["/mnt/base/entrypoint.sh", "test_suite", suite.name]) test_with_docker.containers.append(container) test_with_docker.monitor.add(container) @@ -569,7 +709,7 @@ class TestSuiteRunner(object): except: return False finally: - logging.info("Cleaning up containers for %s" % (suite,)) + logging.info("Cleaning up containers for %s" % (suite.name,)) test_with_docker._stop_container(container) if test_with_docker.cleanup_containers: test_with_docker._rm_container(container) http://git-wip-us.apache.org/repos/asf/impala/blob/2e6a63e3/docker/timeline.html.template ---------------------------------------------------------------------- diff --git a/docker/timeline.html.template b/docker/timeline.html.template index c8de821..d2960d7 100644 --- a/docker/timeline.html.template +++ b/docker/timeline.html.template @@ -96,9 +96,7 @@ function ts_to_date(secs) { } function drawChart() { - var container = document.getElementById('container'); - var timelineContainer = document.createElement("div"); - container.appendChild(timelineContainer); + var timelineContainer = document.getElementById('timelineContainer'); var chart = new google.visualization.Timeline(timelineContainer); var dataTable = new google.visualization.DataTable(); dataTable.addColumn({ type: 'string', id: 'Position' }); @@ -115,7 +113,7 @@ function drawChart() { for (const k of Object.keys(data.metrics)) { var lineChart = document.createElement("div"); - container.appendChild(lineChart); + lineChartContainer.appendChild(lineChart); var dataTable = new google.visualization.DataTable(); dataTable.addColumn({ type: 'timeofday', id: 'Time' }); @@ -139,4 +137,5 @@ function drawChart() { } } </script> -<div id="container" style="height: 200px;"></div> +<div id="timelineContainer" style="height: 400px;"></div> +<div id="lineChartContainer" style="height: 200px;"></div> http://git-wip-us.apache.org/repos/asf/impala/blob/2e6a63e3/testdata/bin/run-hbase.sh ---------------------------------------------------------------------- diff --git a/testdata/bin/run-hbase.sh b/testdata/bin/run-hbase.sh index 878fe4d..264951a 100755 --- a/testdata/bin/run-hbase.sh +++ b/testdata/bin/run-hbase.sh @@ -39,6 +39,7 @@ export HBASE_PID_DIR=${HBASE_LOGDIR} if [[ $IMPALA_MINICLUSTER_PROFILE == 3 ]]; then export HBASE_CLASSPATH=${HADOOP_CLASSPATH} fi +export HBASE_HEAPSIZE=1g EOF # Put zookeeper things in the logs/cluster/zoo directory. http://git-wip-us.apache.org/repos/asf/impala/blob/2e6a63e3/testdata/cluster/node_templates/common/etc/init.d/hdfs-common ---------------------------------------------------------------------- diff --git a/testdata/cluster/node_templates/common/etc/init.d/hdfs-common b/testdata/cluster/node_templates/common/etc/init.d/hdfs-common index 9a7ddd3..d53ecce 100644 --- a/testdata/cluster/node_templates/common/etc/init.d/hdfs-common +++ b/testdata/cluster/node_templates/common/etc/init.d/hdfs-common @@ -18,3 +18,10 @@ export HADOOP_LOG_DIR="$LOG_DIR/hadoop-hdfs" export HADOOP_ROOT_LOGGER="${HADOOP_ROOT_LOGGER:-INFO,RFA}" export HADOOP_LOGFILE=$(basename $0).log + +# Force minicluster processes to have a maximum heap. +# If unset, on large machines, the JVM default +# is 1/4th of the RAM (or so), and processes like DataNode +# end up never garbage collecting. +export HADOOP_HEAPSIZE_MIN=512m +export HADOOP_HEAPSIZE_MAX=2g http://git-wip-us.apache.org/repos/asf/impala/blob/2e6a63e3/tests/conftest.py ---------------------------------------------------------------------- diff --git a/tests/conftest.py b/tests/conftest.py index 144fc5c..1e6adda 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -120,6 +120,10 @@ def pytest_addoption(parser): default=False, help="Run all tests with KRPC disabled. This assumes " "that the test cluster has been started with --disable_krpc.") + parser.addoption("--shard_tests", default=None, + help="If set to N/M (e.g., 3/5), will split the tests into " + "M partitions and run the Nth partition. 1-indexed.") + def pytest_assertrepr_compare(op, left, right): """ @@ -501,3 +505,34 @@ def validate_pytest_config(): if any(pytest.config.option.impalad.startswith(loc) for loc in local_prefixes): logging.error("--testing_remote_cluster can not be used with a local impalad") pytest.exit("Invalid pytest config option: --testing_remote_cluster") + + [email protected](trylast=True) +def pytest_collection_modifyitems(items, config, session): + """Hook to handle --shard_tests command line option. + + If set, this "deselects" a subset of tests, by hashing + their id into buckets. + """ + if not config.option.shard_tests: + return + + num_items = len(items) + this_shard, num_shards = map(int, config.option.shard_tests.split("/")) + assert 0 <= this_shard <= num_shards + if this_shard == num_shards: + this_shard = 0 + + items_selected, items_deselected = [], [] + for i in items: + if hash(i.nodeid) % num_shards == this_shard: + items_selected.append(i) + else: + items_deselected.append(i) + config.hook.pytest_deselected(items=items_deselected) + + # We must modify the items list in place for it to take effect. + items[:] = items_selected + + logging.info("pytest shard selection enabled %s. Of %d items, selected %d items by hash.", + config.option.shard_tests, num_items, len(items)) http://git-wip-us.apache.org/repos/asf/impala/blob/2e6a63e3/tests/run-tests.py ---------------------------------------------------------------------- diff --git a/tests/run-tests.py b/tests/run-tests.py index 95e0d11..b1a9fdd 100755 --- a/tests/run-tests.py +++ b/tests/run-tests.py @@ -180,6 +180,10 @@ def build_test_args(base_name, valid_dirs=VALID_TEST_DIRS): # explicit_tests = pytest.config.getoption(FILE_OR_DIR) config_options = [arg for arg in commandline_args if arg not in explicit_tests] + # We also want to strip out any --shard_tests option and its corresponding value. + while "--shard_tests" in config_options: + i = config_options.index("--shard_tests") + del config_options[i:i+2] test_args = ignored_dirs + logging_args + config_options return test_args @@ -237,6 +241,11 @@ if __name__ == "__main__": test_executor.run_tests(sys.argv[1:]) sys.exit(0) + def run(args): + """Helper to print out arguments of test_executor before invoking.""" + print "Running TestExecutor with args: %s" % (args,) + test_executor.run_tests(args) + os.chdir(TEST_DIR) # Create the test result directory if it doesn't already exist. @@ -248,25 +257,25 @@ if __name__ == "__main__": # pytest warnings/messages and displays collected tests if '--collect-only' in sys.argv: - test_executor.run_tests(sys.argv[1:]) + run(sys.argv[1:]) else: print_metrics('connections') # First run query tests that need to be executed serially if not skip_serial: base_args = ['-m', 'execute_serially'] - test_executor.run_tests(base_args + build_test_args('serial')) + run(base_args + build_test_args('serial')) print_metrics('connections') # Run the stress tests tests if not skip_stress: base_args = ['-m', 'stress', '-n', NUM_STRESS_CLIENTS] - test_executor.run_tests(base_args + build_test_args('stress')) + run(base_args + build_test_args('stress')) print_metrics('connections') # Run the remaining query tests in parallel if not skip_parallel: base_args = ['-m', 'not execute_serially and not stress', '-n', NUM_CONCURRENT_TESTS] - test_executor.run_tests(base_args + build_test_args('parallel')) + run(base_args + build_test_args('parallel')) # The total number of tests executed at this point is expected to be >0 # If it is < 0 then the script needs to exit with a non-zero @@ -277,7 +286,7 @@ if __name__ == "__main__": # Finally, validate impalad/statestored metrics. args = build_test_args(base_name='verify-metrics', valid_dirs=['verifiers']) args.append('verifiers/test_verify_metrics.py') - test_executor.run_tests(args) + run(args) if test_executor.tests_failed: sys.exit(1)
