This is an automated email from the ASF dual-hosted git repository.

jbapple pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7aec1b6db042d88c04a501e24afaa25d4794cccb
Author: Tim Armstrong <tarmstr...@cloudera.com>
AuthorDate: Wed Jan 23 13:28:29 2019 -0800

    IMPALA-7941: part 2/2: use cgroups memory limit
    
    This uses the functionality from part 1 to detect the CGroups memory
    limit and use it to set a lower process memory limit if needed.
    min(system memory, cgroups memory limit) is used instead of
    system memory to determine the memory limit. Behaviour of
    processes without a memory limit set via CGroups is unchanged.
    
    The default behaviour of using 80% of the memory limit detected
    is still in effect. This seems like an OK default, but may
    lead to some amount of wasted memory.
    
    Modify containers to have a default JVM heap size of 2GB and
    --mem_limit_includes_jvm, so that the automatically configured
    memory limit makes more sense.
    
    start-impala-cluster.py is modified to exercise all of this.
    
    Testing:
    Tested a containerised cluster manually on my system, which has
    32GB of RAM. Here's the breakdown from the memz/ page showing
    the JVM heap and auto-configured memory limit.
    
      Process: Limit=7.31 GB Total=1.94 GB Peak=1.94 GB
        JVM: max heap size: Total=1.78 GB
        JVM: non-heap committed: Total=35.56 MB
        Buffer Pool: Free Buffers: Total=0
        Buffer Pool: Clean Pages: Total=0
        Buffer Pool: Unused Reservation: Total=0
        Control Service Queue: Limit=50.00 MB Total=0 Peak=0
        Data Stream Service Queue: Limit=374.27 MB Total=0 Peak=0
        Data Stream Manager Early RPCs: Total=0 Peak=0
        TCMalloc Overhead: Total=12.20 MB
        Untracked Memory: Total=121.31 MB
    
    Change-Id: Ie9fb4fb936a46fc194a204391d03c07c8c7fba21
    Reviewed-on: http://gerrit.cloudera.org:8080/12262
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/runtime/exec-env.cc    | 123 +++++++++++++++++++++++++-----------------
 be/src/runtime/exec-env.h     |   6 +++
 bin/start-impala-cluster.py   |  59 ++++++++++++--------
 docker/coord_exec/Dockerfile  |   2 +-
 docker/coordinator/Dockerfile |   3 +-
 docker/daemon_entrypoint.sh   |   5 +-
 docker/executor/Dockerfile    |   3 +-
 7 files changed, 126 insertions(+), 75 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index e60d752..0b0a889 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -52,6 +52,7 @@
 #include "service/frontend.h"
 #include "service/impala-server.h"
 #include "statestore/statestore-subscriber.h"
+#include "util/cgroup-util.h"
 #include "util/debug-util.h"
 #include "util/default-path-handlers.h"
 #include "util/hdfs-bulk-ops.h"
@@ -228,41 +229,8 @@ Status ExecEnv::Init() {
   }
   RETURN_IF_ERROR(async_rpc_pool_->Init());
 
-  // Initialize global memory limit.
-  // Depending on the system configuration, we will have to calculate the 
process
-  // memory limit either based on the available physical memory, or if 
overcommitting
-  // is turned off, we use the memory commit limit from /proc/meminfo (see
-  // IMPALA-1690).
-  int64_t bytes_limit = 0;
-  bool is_percent;
-  int64_t system_mem;
-  if (MemInfo::vm_overcommit() == 2 &&
-      MemInfo::commit_limit() < MemInfo::physical_mem()) {
-    system_mem = MemInfo::commit_limit();
-    bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, 
system_mem);
-    // There might be the case of misconfiguration, when on a system swap is 
disabled
-    // and overcommitting is turned off the actual usable memory is less than 
the
-    // available physical memory.
-    LOG(WARNING) << "This system shows a discrepancy between the available "
-                 << "memory and the memory commit limit allowed by the "
-                 << "operating system. ( Mem: " << MemInfo::physical_mem()
-                 << "<=> CommitLimit: "
-                 << MemInfo::commit_limit() << "). "
-                 << "Impala will adhere to the smaller value by setting the "
-                 << "process memory limit to " << bytes_limit << " "
-                 << "Please verify the system configuration. Specifically, "
-                 << "/proc/sys/vm/overcommit_memory and "
-                 << "/proc/sys/vm/overcommit_ratio.";
-  } else {
-    system_mem = MemInfo::physical_mem();
-    bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, 
system_mem);
-  }
-  // ParseMemSpec() returns -1 for invalid input and 0 to mean unlimited. From 
Impala
-  // 2.11 onwards we do not support unlimited process memory limits.
-  if (bytes_limit <= 0) {
-    return Status(Substitute("The process memory limit (--mem_limit) must be a 
positive "
-          "bytes value or percentage: $0", FLAGS_mem_limit));
-  }
+  int64_t bytes_limit;
+  RETURN_IF_ERROR(ChooseProcessMemLimit(&bytes_limit));
 
   // Need to register JVM metrics first so that we can use them to compute the 
buffer pool
   // limit.
@@ -281,6 +249,7 @@ Status ExecEnv::Init() {
     post_jvm_bytes_limit -= JvmMemoryMetric::HEAP_MAX_USAGE->GetValue();
   }
 
+  bool is_percent;
   int64_t buffer_pool_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_limit,
       &is_percent, post_jvm_bytes_limit);
   if (buffer_pool_limit <= 0) {
@@ -288,12 +257,15 @@ Status ExecEnv::Init() {
           "positive bytes value or percentage: $0", FLAGS_buffer_pool_limit));
   }
   buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, 
FLAGS_min_buffer_size);
+  LOG(INFO) << "Buffer pool limit: "
+            << PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);
 
   int64_t clean_pages_limit = 
ParseUtil::ParseMemSpec(FLAGS_buffer_pool_clean_pages_limit,
       &is_percent, buffer_pool_limit);
   if (clean_pages_limit <= 0) {
-    return Status(Substitute("Invalid --buffer_pool_clean_pages_limit value, 
must be a percentage or "
-          "positive bytes value or percentage: $0", 
FLAGS_buffer_pool_clean_pages_limit));
+    return Status(Substitute("Invalid --buffer_pool_clean_pages_limit value, 
must be a "
+        "percentage or positive bytes value or percentage: $0",
+        FLAGS_buffer_pool_clean_pages_limit));
   }
   InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit);
 
@@ -350,17 +322,6 @@ Status ExecEnv::Init() {
 #endif
   mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process");
 
-  if (bytes_limit > MemInfo::physical_mem()) {
-    LOG(WARNING) << "Memory limit "
-                 << PrettyPrinter::Print(bytes_limit, TUnit::BYTES)
-                 << " exceeds physical memory of "
-                 << PrettyPrinter::Print(MemInfo::physical_mem(), 
TUnit::BYTES);
-  }
-  LOG(INFO) << "Using global memory limit: "
-            << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
-  LOG(INFO) << "Buffer pool limit: "
-            << PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);
-
   RETURN_IF_ERROR(disk_io_mgr_->Init());
 
   // Start services in order to ensure that dependencies between them are met
@@ -391,6 +352,72 @@ Status ExecEnv::Init() {
   return Status::OK();
 }
 
+Status ExecEnv::ChooseProcessMemLimit(int64_t* bytes_limit) {
+  // Depending on the system configuration, we detect the total amount of 
memory
+  // available to the system - either the available physical memory, or if 
overcommitting
+  // is turned off, we use the memory commit limit from /proc/meminfo (see 
IMPALA-1690).
+  // The 'memory' CGroup can also impose a lower limit on memory consumption,
+  // so we take the minimum of the system memory and the CGroup memory limit.
+  int64_t avail_mem = MemInfo::physical_mem();
+  bool use_commit_limit =
+      MemInfo::vm_overcommit() == 2 && MemInfo::commit_limit() < 
MemInfo::physical_mem();
+  if (use_commit_limit) {
+    avail_mem = MemInfo::commit_limit();
+    // There might be the case of misconfiguration, when on a system swap is 
disabled
+    // and overcommitting is turned off the actual usable memory is less than 
the
+    // available physical memory.
+    LOG(WARNING) << "This system shows a discrepancy between the available "
+                 << "memory and the memory commit limit allowed by the "
+                 << "operating system. ( Mem: " << MemInfo::physical_mem()
+                 << "<=> CommitLimit: " << MemInfo::commit_limit() << "). "
+                 << "Impala will adhere to the smaller value when setting the 
process "
+                 << "memory limit. Please verify the system configuration. 
Specifically, "
+                 << "/proc/sys/vm/overcommit_memory and "
+                 << "/proc/sys/vm/overcommit_ratio.";
+  }
+  LOG(INFO) << "System memory available: " << 
PrettyPrinter::PrintBytes(avail_mem)
+            << " (from " << (use_commit_limit ? "commit limit" : "physical 
mem") << ")";
+  int64_t cgroup_mem_limit;
+  Status cgroup_mem_status = CGroupUtil::FindCGroupMemLimit(&cgroup_mem_limit);
+  if (cgroup_mem_status.ok()) {
+    if (cgroup_mem_limit < avail_mem) {
+      avail_mem = cgroup_mem_limit;
+      LOG(INFO) << "CGroup memory limit for this process reduces physical 
memory "
+                << "available to: " << PrettyPrinter::PrintBytes(avail_mem);
+    }
+  } else {
+    LOG(WARNING) << "Could not detect CGroup memory limit, assuming unlimited: 
$0"
+                 << cgroup_mem_status.GetDetail();
+  }
+  bool is_percent;
+  *bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, 
avail_mem);
+  // ParseMemSpec() returns -1 for invalid input and 0 to mean unlimited. From 
Impala
+  // 2.11 onwards we do not support unlimited process memory limits.
+  if (*bytes_limit <= 0) {
+    return Status(Substitute("The process memory limit (--mem_limit) must be a 
positive "
+                             "bytes value or percentage: $0", 
FLAGS_mem_limit));
+  }
+  if (is_percent) {
+    LOG(INFO) << "Using process memory limit: " << 
PrettyPrinter::PrintBytes(*bytes_limit)
+              << " (--mem_limit=" << FLAGS_mem_limit << " of "
+              << PrettyPrinter::PrintBytes(avail_mem) << ")";
+  } else {
+    LOG(INFO) << "Using process memory limit: " << 
PrettyPrinter::PrintBytes(*bytes_limit)
+              << " (--mem_limit=" << FLAGS_mem_limit << ")";
+  }
+  if (*bytes_limit > MemInfo::physical_mem()) {
+    LOG(WARNING) << "Process memory limit " << 
PrettyPrinter::PrintBytes(*bytes_limit)
+                 << " exceeds physical memory of "
+                 << PrettyPrinter::PrintBytes(MemInfo::physical_mem());
+  }
+  if (cgroup_mem_status.ok() && *bytes_limit > cgroup_mem_limit) {
+    LOG(WARNING) << "Process Memory limit " << 
PrettyPrinter::PrintBytes(*bytes_limit)
+                 << " exceeds CGroup memory limit of "
+                 << PrettyPrinter::PrintBytes(cgroup_mem_limit);
+  }
+  return Status::OK();
+}
+
 Status ExecEnv::StartStatestoreSubscriberService() {
   LOG(INFO) << "Starting statestore subscriber service";
 
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 465fe7a..8a5a1f4 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -248,6 +248,12 @@ class ExecEnv {
   /// address lists be identical in order to share a KuduClient.
   KuduClientMap kudu_client_map_;
 
+  /// Choose a memory limit (returned in *bytes_limit) based on the 
--mem_limit flag and
+  /// the memory available to the daemon process. Returns an error if the 
memory limit is
+  /// invalid or another error is encountered that should prevent starting up 
the daemon.
+  /// Logs the memory limit chosen and any relevant diagnostics related to 
that choice.
+  Status ChooseProcessMemLimit(int64_t* bytes_limit);
+
   /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity.
   void InitBufferPool(int64_t min_page_len, int64_t capacity, int64_t 
clean_pages_limit);
 
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index ec67694..b86e6c5 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -288,21 +288,14 @@ def build_impalad_arg_lists(cluster_size, 
num_coordinators, use_exclusive_coordi
   # blob that has to be tokenized later. However, that may require more 
substantial
   # refactoring.
 
-  # Set mem_limit of each impalad to the smaller of 12GB or
-  # 1/cluster_size (typically 1/3) of 70% of system memory.
-  #
-  # The default memory limit for an impalad is 80% of the total system memory. 
On a
-  # mini-cluster with 3 impalads that means 240%. Since having an impalad be 
OOM killed
-  # is very annoying, the mem limit will be reduced. This can be overridden 
using the
-  # --impalad_args flag. virtual_memory().total returns the total physical 
memory.
-  # The exact ratio to use is somewhat arbitrary. Peak memory usage during
-  # tests depends on the concurrency of parallel tests as well as their 
ordering.
-  # On the other hand, to avoid using too much memory, we limit the
-  # memory choice here to max out at 12GB. This should be sufficient for tests.
-  #
-  # Beware that ASAN builds use more memory than regular builds.
-  mem_limit = int(0.7 * psutil.virtual_memory().total / cluster_size)
-  mem_limit = min(12 * 1024 * 1024 * 1024, mem_limit)
+  mem_limit_arg = ""
+  if options.docker_network is None:
+    mem_limit_arg = 
"-mem_limit={0}".format(compute_impalad_mem_limit(cluster_size))
+  else:
+    # For containerised impalads, set a memory limit via docker instead of 
directly,
+    # to emulate what would happen in a production container. JVM heap is 
included,
+    # so we should be able to use 100% of the detected mem_limit.
+    mem_limit_arg = "-mem_limit=100%"
 
   delay_list = []
   if options.catalog_init_delays != "":
@@ -321,15 +314,13 @@ def build_impalad_arg_lists(cluster_size, 
num_coordinators, use_exclusive_coordi
     if remap_ports:
       impala_port_args = build_impalad_port_args(i)
     # impalad args from the --impalad_args flag. Also replacing '#ID' with the 
instance.
-    # TODO: IMPALA-7940: mem_limit should be set on docker container and 
detected by
-    # the impala daemon process instead of set manually here.
     param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
-    args = ("--mem_limit={mem_limit} "
+    args = ("{mem_limit_arg} "
         "{impala_logging_args} "
         "{jvm_args} "
         "{impala_port_args} "
         "{param_args}").format(
-            mem_limit=mem_limit,  # Goes first so --impalad_args will override 
it.
+            mem_limit_arg=mem_limit_arg,  # Goes first so --impalad_args will 
override it.
             impala_logging_args=" ".join(build_logging_args(service_name)),
             jvm_args=" ".join(build_jvm_args(i)),
             impala_port_args=impala_port_args,
@@ -364,6 +355,23 @@ def build_impalad_arg_lists(cluster_size, 
num_coordinators, use_exclusive_coordi
   return impalad_args
 
 
+def compute_impalad_mem_limit(cluster_size):
+  # Set mem_limit of each impalad to the smaller of 12GB or
+  # 1/cluster_size (typically 1/3) of 70% of system memory.
+  #
+  # The default memory limit for an impalad is 80% of the total system memory. 
On a
+  # mini-cluster with 3 impalads that means 240%. Since having an impalad be 
OOM killed
+  # is very annoying, the mem limit will be reduced. This can be overridden 
using the
+  # --impalad_args flag. virtual_memory().total returns the total physical 
memory.
+  # The exact ratio to use is somewhat arbitrary. Peak memory usage during
+  # tests depends on the concurrency of parallel tests as well as their 
ordering.
+  # On the other hand, to avoid using too much memory, we limit the
+  # memory choice here to max out at 12GB. This should be sufficient for tests.
+  #
+  # Beware that ASAN builds use more memory than regular builds.
+  mem_limit = int(0.7 * psutil.virtual_memory().total / cluster_size)
+  return min(12 * 1024 * 1024 * 1024, mem_limit)
+
 class MiniClusterOperations(object):
   """Implementations of operations for the non-containerized minicluster
   implementation.
@@ -481,13 +489,14 @@ class DockerMiniClusterOperations(object):
     impalad_arg_lists = build_impalad_arg_lists(
         cluster_size, num_coordinators, use_exclusive_coordinators, 
remap_ports=False)
     assert cluster_size == len(impalad_arg_lists)
+    mem_limit = compute_impalad_mem_limit(cluster_size)
     for i in xrange(cluster_size):
       chosen_ports = choose_impalad_ports(i)
       port_map = {DEFAULT_BEESWAX_PORT: chosen_ports['beeswax_port'],
                   DEFAULT_HS2_PORT: chosen_ports['hs2_port'],
                   DEFAULT_IMPALAD_WEBSERVER_PORT: 
chosen_ports['webserver_port']}
       self.__run_container__("impalad_coord_exec",
-          shlex.split(impalad_arg_lists[i]), port_map, i)
+          shlex.split(impalad_arg_lists[i]), port_map, i, mem_limit=mem_limit)
 
   def __gen_container_name__(self, daemon, instance=None):
     """Generate the name for the container, which should be unique among 
containers
@@ -502,14 +511,15 @@ class DockerMiniClusterOperations(object):
       return daemon
     return "{0}-{1}".format(daemon, instance)
 
-  def __run_container__(self, daemon, args, port_map, instance=None):
+  def __run_container__(self, daemon, args, port_map, instance=None, 
mem_limit=None):
     """Launch a container with the daemon - impalad, catalogd, or statestored. 
If there
     are multiple impalads in the cluster, a unique instance number must be 
specified.
     'args' are command-line arguments to be appended to the end of the daemon 
command
     line. 'port_map' determines a mapping from container ports to ports on 
localhost. If
     --docker_auto_ports was set on the command line, 'port_map' is ignored and 
Docker
     will automatically choose the mapping. If there is an existing running or 
stopped
-    container with the same name, it will be destroyed."""
+    container with the same name, it will be destroyed. If provided, mem_limit 
is
+    passed to "docker run" as a string to set the memory limit for the 
container."""
     self.__destroy_container__(daemon, instance)
     if options.docker_auto_ports:
       port_args = ["-P"]
@@ -527,10 +537,13 @@ class DockerMiniClusterOperations(object):
     # for config changes to take effect.
     conf_dir = os.path.join(IMPALA_HOME, "fe/target/test-classes")
     mount_args = ["--mount", 
"type=bind,src={0},dst=/opt/impala/conf".format(conf_dir)]
+    mem_limit_args = []
+    if mem_limit is not None:
+      mem_limit_args = ["--memory", str(mem_limit)]
     LOG.info("Running container {0}".format(container_name))
     run_cmd = (["docker", "run", "-d"] + env_args + port_args + ["--network",
       self.network_name, "--name", container_name, "--network-alias", 
host_name] +
-      mount_args + [image_tag] + args)
+      mount_args + mem_limit_args + [image_tag] + args)
     LOG.info("Running command {0}".format(run_cmd))
     check_call(run_cmd)
     port_mapping = check_output(["docker", "port", container_name])
diff --git a/docker/coord_exec/Dockerfile b/docker/coord_exec/Dockerfile
index 8000b32..6e6fa09 100644
--- a/docker/coord_exec/Dockerfile
+++ b/docker/coord_exec/Dockerfile
@@ -27,4 +27,4 @@ EXPOSE 25000
 
 ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\
      "-abort_on_config_error=false", "-state_store_host=statestored",\
-     "-catalog_service_host=catalogd"]
+     "-catalog_service_host=catalogd", "-mem_limit_includes_jvm=true"]
diff --git a/docker/coordinator/Dockerfile b/docker/coordinator/Dockerfile
index 24a4cb4..def6da3 100644
--- a/docker/coordinator/Dockerfile
+++ b/docker/coordinator/Dockerfile
@@ -27,4 +27,5 @@ EXPOSE 25000
 
 ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\
      "-abort_on_config_error=false", "-state_store_host=statestored",\
-     "-catalog_service_host=catalogd" "-is_executor=false"]
+     "-catalog_service_host=catalogd", "-is_executor=false", \
+     "-mem_limit_includes_jvm=true"]
diff --git a/docker/daemon_entrypoint.sh b/docker/daemon_entrypoint.sh
index 6839686..ac1efe0 100755
--- a/docker/daemon_entrypoint.sh
+++ b/docker/daemon_entrypoint.sh
@@ -39,7 +39,10 @@ do
   CLASSPATH+=:$jar
 done
 echo "CLASSPATH: $CLASSPATH"
-echo "LD_LIBRARY_PATH: $LD_LIBRARY_PAT"
+echo "LD_LIBRARY_PATH: $LD_LIBRARY_PATH"
+
+# Default to 2GB heap. Allow overriding by externally-set JAVA_TOOL_OPTIONS.
+export JAVA_TOOL_OPTIONS="-Xmx2g $JAVA_TOOL_OPTIONS"
 
 "$@"
 EXIT_CODE=$?
diff --git a/docker/executor/Dockerfile b/docker/executor/Dockerfile
index dc47fe1..6741203 100644
--- a/docker/executor/Dockerfile
+++ b/docker/executor/Dockerfile
@@ -23,4 +23,5 @@ EXPOSE 25000
 
 ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\
      "-abort_on_config_error=false", "-state_store_host=statestored",\
-     "-catalog_service_host=catalogd" "-is_coordinator=false"]
+     "-catalog_service_host=catalogd", "-is_coordinator=false",\
+     "-mem_limit_includes_jvm=true"]

Reply via email to