This is an automated email from the ASF dual-hosted git repository. jbapple pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 7aec1b6db042d88c04a501e24afaa25d4794cccb Author: Tim Armstrong <tarmstr...@cloudera.com> AuthorDate: Wed Jan 23 13:28:29 2019 -0800 IMPALA-7941: part 2/2: use cgroups memory limit This uses the functionality from part 1 to detect the CGroups memory limit and use it to set a lower process memory limit if needed. min(system memory, cgroups memory limit) is used instead of system memory to determine the memory limit. Behaviour of processes without a memory limit set via CGroups is unchanged. The default behaviour of using 80% of the memory limit detected is still in effect. This seems like an OK default, but may lead to some amount of wasted memory. Modify containers to have a default JVM heap size of 2GB and --mem_limit_includes_jvm, so that the automatically configured memory limit makes more sense. start-impala-cluster.py is modified to exercise all of this. Testing: Tested a containerised cluster manually on my system, which has 32GB of RAM. Here's the breakdown from the memz/ page showing the JVM heap and auto-configured memory limit. Process: Limit=7.31 GB Total=1.94 GB Peak=1.94 GB JVM: max heap size: Total=1.78 GB JVM: non-heap committed: Total=35.56 MB Buffer Pool: Free Buffers: Total=0 Buffer Pool: Clean Pages: Total=0 Buffer Pool: Unused Reservation: Total=0 Control Service Queue: Limit=50.00 MB Total=0 Peak=0 Data Stream Service Queue: Limit=374.27 MB Total=0 Peak=0 Data Stream Manager Early RPCs: Total=0 Peak=0 TCMalloc Overhead: Total=12.20 MB Untracked Memory: Total=121.31 MB Change-Id: Ie9fb4fb936a46fc194a204391d03c07c8c7fba21 Reviewed-on: http://gerrit.cloudera.org:8080/12262 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/runtime/exec-env.cc | 123 +++++++++++++++++++++++++----------------- be/src/runtime/exec-env.h | 6 +++ bin/start-impala-cluster.py | 59 ++++++++++++-------- docker/coord_exec/Dockerfile | 2 +- docker/coordinator/Dockerfile | 3 +- docker/daemon_entrypoint.sh | 5 +- docker/executor/Dockerfile | 3 +- 7 files changed, 126 insertions(+), 75 deletions(-) diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index e60d752..0b0a889 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -52,6 +52,7 @@ #include "service/frontend.h" #include "service/impala-server.h" #include "statestore/statestore-subscriber.h" +#include "util/cgroup-util.h" #include "util/debug-util.h" #include "util/default-path-handlers.h" #include "util/hdfs-bulk-ops.h" @@ -228,41 +229,8 @@ Status ExecEnv::Init() { } RETURN_IF_ERROR(async_rpc_pool_->Init()); - // Initialize global memory limit. - // Depending on the system configuration, we will have to calculate the process - // memory limit either based on the available physical memory, or if overcommitting - // is turned off, we use the memory commit limit from /proc/meminfo (see - // IMPALA-1690). - int64_t bytes_limit = 0; - bool is_percent; - int64_t system_mem; - if (MemInfo::vm_overcommit() == 2 && - MemInfo::commit_limit() < MemInfo::physical_mem()) { - system_mem = MemInfo::commit_limit(); - bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, system_mem); - // There might be the case of misconfiguration, when on a system swap is disabled - // and overcommitting is turned off the actual usable memory is less than the - // available physical memory. - LOG(WARNING) << "This system shows a discrepancy between the available " - << "memory and the memory commit limit allowed by the " - << "operating system. ( Mem: " << MemInfo::physical_mem() - << "<=> CommitLimit: " - << MemInfo::commit_limit() << "). " - << "Impala will adhere to the smaller value by setting the " - << "process memory limit to " << bytes_limit << " " - << "Please verify the system configuration. Specifically, " - << "/proc/sys/vm/overcommit_memory and " - << "/proc/sys/vm/overcommit_ratio."; - } else { - system_mem = MemInfo::physical_mem(); - bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, system_mem); - } - // ParseMemSpec() returns -1 for invalid input and 0 to mean unlimited. From Impala - // 2.11 onwards we do not support unlimited process memory limits. - if (bytes_limit <= 0) { - return Status(Substitute("The process memory limit (--mem_limit) must be a positive " - "bytes value or percentage: $0", FLAGS_mem_limit)); - } + int64_t bytes_limit; + RETURN_IF_ERROR(ChooseProcessMemLimit(&bytes_limit)); // Need to register JVM metrics first so that we can use them to compute the buffer pool // limit. @@ -281,6 +249,7 @@ Status ExecEnv::Init() { post_jvm_bytes_limit -= JvmMemoryMetric::HEAP_MAX_USAGE->GetValue(); } + bool is_percent; int64_t buffer_pool_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_limit, &is_percent, post_jvm_bytes_limit); if (buffer_pool_limit <= 0) { @@ -288,12 +257,15 @@ Status ExecEnv::Init() { "positive bytes value or percentage: $0", FLAGS_buffer_pool_limit)); } buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, FLAGS_min_buffer_size); + LOG(INFO) << "Buffer pool limit: " + << PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES); int64_t clean_pages_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_clean_pages_limit, &is_percent, buffer_pool_limit); if (clean_pages_limit <= 0) { - return Status(Substitute("Invalid --buffer_pool_clean_pages_limit value, must be a percentage or " - "positive bytes value or percentage: $0", FLAGS_buffer_pool_clean_pages_limit)); + return Status(Substitute("Invalid --buffer_pool_clean_pages_limit value, must be a " + "percentage or positive bytes value or percentage: $0", + FLAGS_buffer_pool_clean_pages_limit)); } InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit); @@ -350,17 +322,6 @@ Status ExecEnv::Init() { #endif mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process"); - if (bytes_limit > MemInfo::physical_mem()) { - LOG(WARNING) << "Memory limit " - << PrettyPrinter::Print(bytes_limit, TUnit::BYTES) - << " exceeds physical memory of " - << PrettyPrinter::Print(MemInfo::physical_mem(), TUnit::BYTES); - } - LOG(INFO) << "Using global memory limit: " - << PrettyPrinter::Print(bytes_limit, TUnit::BYTES); - LOG(INFO) << "Buffer pool limit: " - << PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES); - RETURN_IF_ERROR(disk_io_mgr_->Init()); // Start services in order to ensure that dependencies between them are met @@ -391,6 +352,72 @@ Status ExecEnv::Init() { return Status::OK(); } +Status ExecEnv::ChooseProcessMemLimit(int64_t* bytes_limit) { + // Depending on the system configuration, we detect the total amount of memory + // available to the system - either the available physical memory, or if overcommitting + // is turned off, we use the memory commit limit from /proc/meminfo (see IMPALA-1690). + // The 'memory' CGroup can also impose a lower limit on memory consumption, + // so we take the minimum of the system memory and the CGroup memory limit. + int64_t avail_mem = MemInfo::physical_mem(); + bool use_commit_limit = + MemInfo::vm_overcommit() == 2 && MemInfo::commit_limit() < MemInfo::physical_mem(); + if (use_commit_limit) { + avail_mem = MemInfo::commit_limit(); + // There might be the case of misconfiguration, when on a system swap is disabled + // and overcommitting is turned off the actual usable memory is less than the + // available physical memory. + LOG(WARNING) << "This system shows a discrepancy between the available " + << "memory and the memory commit limit allowed by the " + << "operating system. ( Mem: " << MemInfo::physical_mem() + << "<=> CommitLimit: " << MemInfo::commit_limit() << "). " + << "Impala will adhere to the smaller value when setting the process " + << "memory limit. Please verify the system configuration. Specifically, " + << "/proc/sys/vm/overcommit_memory and " + << "/proc/sys/vm/overcommit_ratio."; + } + LOG(INFO) << "System memory available: " << PrettyPrinter::PrintBytes(avail_mem) + << " (from " << (use_commit_limit ? "commit limit" : "physical mem") << ")"; + int64_t cgroup_mem_limit; + Status cgroup_mem_status = CGroupUtil::FindCGroupMemLimit(&cgroup_mem_limit); + if (cgroup_mem_status.ok()) { + if (cgroup_mem_limit < avail_mem) { + avail_mem = cgroup_mem_limit; + LOG(INFO) << "CGroup memory limit for this process reduces physical memory " + << "available to: " << PrettyPrinter::PrintBytes(avail_mem); + } + } else { + LOG(WARNING) << "Could not detect CGroup memory limit, assuming unlimited: $0" + << cgroup_mem_status.GetDetail(); + } + bool is_percent; + *bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, avail_mem); + // ParseMemSpec() returns -1 for invalid input and 0 to mean unlimited. From Impala + // 2.11 onwards we do not support unlimited process memory limits. + if (*bytes_limit <= 0) { + return Status(Substitute("The process memory limit (--mem_limit) must be a positive " + "bytes value or percentage: $0", FLAGS_mem_limit)); + } + if (is_percent) { + LOG(INFO) << "Using process memory limit: " << PrettyPrinter::PrintBytes(*bytes_limit) + << " (--mem_limit=" << FLAGS_mem_limit << " of " + << PrettyPrinter::PrintBytes(avail_mem) << ")"; + } else { + LOG(INFO) << "Using process memory limit: " << PrettyPrinter::PrintBytes(*bytes_limit) + << " (--mem_limit=" << FLAGS_mem_limit << ")"; + } + if (*bytes_limit > MemInfo::physical_mem()) { + LOG(WARNING) << "Process memory limit " << PrettyPrinter::PrintBytes(*bytes_limit) + << " exceeds physical memory of " + << PrettyPrinter::PrintBytes(MemInfo::physical_mem()); + } + if (cgroup_mem_status.ok() && *bytes_limit > cgroup_mem_limit) { + LOG(WARNING) << "Process Memory limit " << PrettyPrinter::PrintBytes(*bytes_limit) + << " exceeds CGroup memory limit of " + << PrettyPrinter::PrintBytes(cgroup_mem_limit); + } + return Status::OK(); +} + Status ExecEnv::StartStatestoreSubscriberService() { LOG(INFO) << "Starting statestore subscriber service"; diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index 465fe7a..8a5a1f4 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -248,6 +248,12 @@ class ExecEnv { /// address lists be identical in order to share a KuduClient. KuduClientMap kudu_client_map_; + /// Choose a memory limit (returned in *bytes_limit) based on the --mem_limit flag and + /// the memory available to the daemon process. Returns an error if the memory limit is + /// invalid or another error is encountered that should prevent starting up the daemon. + /// Logs the memory limit chosen and any relevant diagnostics related to that choice. + Status ChooseProcessMemLimit(int64_t* bytes_limit); + /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity. void InitBufferPool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit); diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index ec67694..b86e6c5 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -288,21 +288,14 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi # blob that has to be tokenized later. However, that may require more substantial # refactoring. - # Set mem_limit of each impalad to the smaller of 12GB or - # 1/cluster_size (typically 1/3) of 70% of system memory. - # - # The default memory limit for an impalad is 80% of the total system memory. On a - # mini-cluster with 3 impalads that means 240%. Since having an impalad be OOM killed - # is very annoying, the mem limit will be reduced. This can be overridden using the - # --impalad_args flag. virtual_memory().total returns the total physical memory. - # The exact ratio to use is somewhat arbitrary. Peak memory usage during - # tests depends on the concurrency of parallel tests as well as their ordering. - # On the other hand, to avoid using too much memory, we limit the - # memory choice here to max out at 12GB. This should be sufficient for tests. - # - # Beware that ASAN builds use more memory than regular builds. - mem_limit = int(0.7 * psutil.virtual_memory().total / cluster_size) - mem_limit = min(12 * 1024 * 1024 * 1024, mem_limit) + mem_limit_arg = "" + if options.docker_network is None: + mem_limit_arg = "-mem_limit={0}".format(compute_impalad_mem_limit(cluster_size)) + else: + # For containerised impalads, set a memory limit via docker instead of directly, + # to emulate what would happen in a production container. JVM heap is included, + # so we should be able to use 100% of the detected mem_limit. + mem_limit_arg = "-mem_limit=100%" delay_list = [] if options.catalog_init_delays != "": @@ -321,15 +314,13 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi if remap_ports: impala_port_args = build_impalad_port_args(i) # impalad args from the --impalad_args flag. Also replacing '#ID' with the instance. - # TODO: IMPALA-7940: mem_limit should be set on docker container and detected by - # the impala daemon process instead of set manually here. param_args = (" ".join(options.impalad_args)).replace("#ID", str(i)) - args = ("--mem_limit={mem_limit} " + args = ("{mem_limit_arg} " "{impala_logging_args} " "{jvm_args} " "{impala_port_args} " "{param_args}").format( - mem_limit=mem_limit, # Goes first so --impalad_args will override it. + mem_limit_arg=mem_limit_arg, # Goes first so --impalad_args will override it. impala_logging_args=" ".join(build_logging_args(service_name)), jvm_args=" ".join(build_jvm_args(i)), impala_port_args=impala_port_args, @@ -364,6 +355,23 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi return impalad_args +def compute_impalad_mem_limit(cluster_size): + # Set mem_limit of each impalad to the smaller of 12GB or + # 1/cluster_size (typically 1/3) of 70% of system memory. + # + # The default memory limit for an impalad is 80% of the total system memory. On a + # mini-cluster with 3 impalads that means 240%. Since having an impalad be OOM killed + # is very annoying, the mem limit will be reduced. This can be overridden using the + # --impalad_args flag. virtual_memory().total returns the total physical memory. + # The exact ratio to use is somewhat arbitrary. Peak memory usage during + # tests depends on the concurrency of parallel tests as well as their ordering. + # On the other hand, to avoid using too much memory, we limit the + # memory choice here to max out at 12GB. This should be sufficient for tests. + # + # Beware that ASAN builds use more memory than regular builds. + mem_limit = int(0.7 * psutil.virtual_memory().total / cluster_size) + return min(12 * 1024 * 1024 * 1024, mem_limit) + class MiniClusterOperations(object): """Implementations of operations for the non-containerized minicluster implementation. @@ -481,13 +489,14 @@ class DockerMiniClusterOperations(object): impalad_arg_lists = build_impalad_arg_lists( cluster_size, num_coordinators, use_exclusive_coordinators, remap_ports=False) assert cluster_size == len(impalad_arg_lists) + mem_limit = compute_impalad_mem_limit(cluster_size) for i in xrange(cluster_size): chosen_ports = choose_impalad_ports(i) port_map = {DEFAULT_BEESWAX_PORT: chosen_ports['beeswax_port'], DEFAULT_HS2_PORT: chosen_ports['hs2_port'], DEFAULT_IMPALAD_WEBSERVER_PORT: chosen_ports['webserver_port']} self.__run_container__("impalad_coord_exec", - shlex.split(impalad_arg_lists[i]), port_map, i) + shlex.split(impalad_arg_lists[i]), port_map, i, mem_limit=mem_limit) def __gen_container_name__(self, daemon, instance=None): """Generate the name for the container, which should be unique among containers @@ -502,14 +511,15 @@ class DockerMiniClusterOperations(object): return daemon return "{0}-{1}".format(daemon, instance) - def __run_container__(self, daemon, args, port_map, instance=None): + def __run_container__(self, daemon, args, port_map, instance=None, mem_limit=None): """Launch a container with the daemon - impalad, catalogd, or statestored. If there are multiple impalads in the cluster, a unique instance number must be specified. 'args' are command-line arguments to be appended to the end of the daemon command line. 'port_map' determines a mapping from container ports to ports on localhost. If --docker_auto_ports was set on the command line, 'port_map' is ignored and Docker will automatically choose the mapping. If there is an existing running or stopped - container with the same name, it will be destroyed.""" + container with the same name, it will be destroyed. If provided, mem_limit is + passed to "docker run" as a string to set the memory limit for the container.""" self.__destroy_container__(daemon, instance) if options.docker_auto_ports: port_args = ["-P"] @@ -527,10 +537,13 @@ class DockerMiniClusterOperations(object): # for config changes to take effect. conf_dir = os.path.join(IMPALA_HOME, "fe/target/test-classes") mount_args = ["--mount", "type=bind,src={0},dst=/opt/impala/conf".format(conf_dir)] + mem_limit_args = [] + if mem_limit is not None: + mem_limit_args = ["--memory", str(mem_limit)] LOG.info("Running container {0}".format(container_name)) run_cmd = (["docker", "run", "-d"] + env_args + port_args + ["--network", self.network_name, "--name", container_name, "--network-alias", host_name] + - mount_args + [image_tag] + args) + mount_args + mem_limit_args + [image_tag] + args) LOG.info("Running command {0}".format(run_cmd)) check_call(run_cmd) port_mapping = check_output(["docker", "port", container_name]) diff --git a/docker/coord_exec/Dockerfile b/docker/coord_exec/Dockerfile index 8000b32..6e6fa09 100644 --- a/docker/coord_exec/Dockerfile +++ b/docker/coord_exec/Dockerfile @@ -27,4 +27,4 @@ EXPOSE 25000 ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\ "-abort_on_config_error=false", "-state_store_host=statestored",\ - "-catalog_service_host=catalogd"] + "-catalog_service_host=catalogd", "-mem_limit_includes_jvm=true"] diff --git a/docker/coordinator/Dockerfile b/docker/coordinator/Dockerfile index 24a4cb4..def6da3 100644 --- a/docker/coordinator/Dockerfile +++ b/docker/coordinator/Dockerfile @@ -27,4 +27,5 @@ EXPOSE 25000 ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\ "-abort_on_config_error=false", "-state_store_host=statestored",\ - "-catalog_service_host=catalogd" "-is_executor=false"] + "-catalog_service_host=catalogd", "-is_executor=false", \ + "-mem_limit_includes_jvm=true"] diff --git a/docker/daemon_entrypoint.sh b/docker/daemon_entrypoint.sh index 6839686..ac1efe0 100755 --- a/docker/daemon_entrypoint.sh +++ b/docker/daemon_entrypoint.sh @@ -39,7 +39,10 @@ do CLASSPATH+=:$jar done echo "CLASSPATH: $CLASSPATH" -echo "LD_LIBRARY_PATH: $LD_LIBRARY_PAT" +echo "LD_LIBRARY_PATH: $LD_LIBRARY_PATH" + +# Default to 2GB heap. Allow overriding by externally-set JAVA_TOOL_OPTIONS. +export JAVA_TOOL_OPTIONS="-Xmx2g $JAVA_TOOL_OPTIONS" "$@" EXIT_CODE=$? diff --git a/docker/executor/Dockerfile b/docker/executor/Dockerfile index dc47fe1..6741203 100644 --- a/docker/executor/Dockerfile +++ b/docker/executor/Dockerfile @@ -23,4 +23,5 @@ EXPOSE 25000 ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\ "-abort_on_config_error=false", "-state_store_host=statestored",\ - "-catalog_service_host=catalogd" "-is_coordinator=false"] + "-catalog_service_host=catalogd", "-is_coordinator=false",\ + "-mem_limit_includes_jvm=true"]