This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 839a25c89b5620e8bdc37383c8512d912fcb5d0f
Author: Yida Wu <wydbaggio...@gmail.com>
AuthorDate: Fri Dec 9 18:53:47 2022 -0800

    IMPALA-11786: Preserve memory for codegen cache
    
    IMPALA-11470 adds support for codegen cache, however the admission
    controller is not aware of the memory usage of the codegen cache,
    while the codegen cache is actually using the memory quota from
    the query memory. It could result in query failures when running
    heavy workloads and admission controller has fully admitted queries.
    
    This patch subtracts the codegen cache capacity from the admission
    memory limit during initialization, therefore preserving the memory
    consumption of codegen cache from the beginning, and treating it as
    a separate memory independent to the query memory reservation.
    
    Also reduces the max codegen cache memory from 20 percent to 10
    percent, and changes some failed testcases due to the reduction of
    the admit memory limit.
    
    Tests:
    Passed exhaustive tests.
    
    Change-Id: Iebdc04ba1b91578d74684209a11c815225b8505a
    Reviewed-on: http://gerrit.cloudera.org:8080/19377
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/runtime/exec-env.cc                        | 53 +++++++++++++----------
 tests/custom_cluster/test_admission_controller.py |  4 +-
 tests/custom_cluster/test_executor_groups.py      |  5 ++-
 tests/custom_cluster/test_jvm_mem_tracking.py     |  3 +-
 4 files changed, 40 insertions(+), 25 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 5b0a95742..3c57abdd3 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -169,8 +169,8 @@ DEFINE_string(metrics_webserver_interface, "",
 const static string DEFAULT_FS = "fs.defaultFS";
 
 // The max percentage that the codegen cache can take from the total process 
memory.
-// The value is set to 20%.
-const double MAX_CODEGEN_CACHE_MEM_PERCENT = 0.2;
+// The value is set to 10%.
+const double MAX_CODEGEN_CACHE_MEM_PERCENT = 0.1;
 
 // The multiplier for how many queries a dedicated coordinator can run 
compared to an
 // executor. This is only effective when using non-default settings for 
executor groups
@@ -332,7 +332,35 @@ Status ExecEnv::Init() {
     }
   }
 
-  bool is_percent;
+  bool is_percent = false;
+  int64_t codegen_cache_capacity =
+      ParseUtil::ParseMemSpec(FLAGS_codegen_cache_capacity, &is_percent, 0);
+  if (codegen_cache_capacity > 0) {
+    // If codegen_cache_capacity is larger than 0, the number should not be a 
percentage.
+    DCHECK(!is_percent);
+    int64_t codegen_cache_limit = admit_mem_limit_ * 
MAX_CODEGEN_CACHE_MEM_PERCENT;
+    DCHECK(codegen_cache_limit > 0);
+    if (codegen_cache_capacity > codegen_cache_limit) {
+      LOG(INFO) << "CodeGen Cache capacity changed from "
+                << PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES) 
<< " to "
+                << PrettyPrinter::Print(codegen_cache_limit, TUnit::BYTES)
+                << " due to reaching the limit.";
+      codegen_cache_capacity = codegen_cache_limit;
+    }
+    codegen_cache_.reset(new CodeGenCache(metrics_.get()));
+    RETURN_IF_ERROR(codegen_cache_->Init(codegen_cache_capacity));
+    LOG(INFO) << "CodeGen Cache initialized with capacity "
+              << PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES);
+    // Preserve the memory for codegen cache.
+    admit_mem_limit_ -= codegen_cache_capacity;
+    DCHECK_GT(admit_mem_limit_, 0);
+  } else {
+    LOG(INFO) << "CodeGen Cache is disabled.";
+  }
+
+  LOG(INFO) << "Admit memory limit: "
+            << PrettyPrinter::Print(admit_mem_limit_, TUnit::BYTES);
+
   int64_t buffer_pool_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_limit,
       &is_percent, admit_mem_limit_);
   if (buffer_pool_limit <= 0) {
@@ -450,25 +478,6 @@ Status ExecEnv::Init() {
 
   RETURN_IF_ERROR(admission_controller_->Init());
   RETURN_IF_ERROR(InitHadoopConfig());
-  int64_t codegen_cache_capacity =
-      ParseUtil::ParseMemSpec(FLAGS_codegen_cache_capacity, &is_percent, 0);
-  if (codegen_cache_capacity > 0) {
-    int64_t codegen_cache_limit = mem_tracker_->limit() * 
MAX_CODEGEN_CACHE_MEM_PERCENT;
-    DCHECK(codegen_cache_limit > 0);
-    if (codegen_cache_capacity > codegen_cache_limit) {
-      LOG(INFO) << "CodeGen Cache capacity changed to "
-                << PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES) 
<< " from "
-                << PrettyPrinter::Print(codegen_cache_limit, TUnit::BYTES)
-                << " due to reaching the limit.";
-      codegen_cache_capacity = codegen_cache_limit;
-    }
-    codegen_cache_.reset(new CodeGenCache(metrics_.get()));
-    RETURN_IF_ERROR(codegen_cache_->Init(codegen_cache_capacity));
-    LOG(INFO) << "CodeGen Cache initialized with capacity "
-              << PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES);
-  } else {
-    LOG(INFO) << "CodeGen Cache is disabled.";
-  }
   return Status::OK();
 }
 
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 445c4bda4..213772ad7 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -128,7 +128,8 @@ RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], 
"fe", "src", "test", "re
 
 def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
                                  proc_mem_limit=None, 
queue_wait_timeout_ms=None,
-                                 admission_control_slots=None, 
executor_groups=None):
+                                 admission_control_slots=None, 
executor_groups=None,
+                                 codegen_cache_capacity=0):
   extra_flags = ""
   if proc_mem_limit is not None:
     extra_flags += " -mem_limit={0}".format(proc_mem_limit)
@@ -138,6 +139,7 @@ def impalad_admission_ctrl_flags(max_requests, max_queued, 
pool_max_mem,
     extra_flags += " 
-admission_control_slots={0}".format(admission_control_slots)
   if executor_groups is not None:
     extra_flags += " -executor_groups={0}".format(executor_groups)
+  extra_flags += " -codegen_cache_capacity={0}".format(codegen_cache_capacity)
 
   return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
           "-default_pool_max_queued {1} -default_pool_mem_limit {2} 
{3}".format(
diff --git a/tests/custom_cluster/test_executor_groups.py 
b/tests/custom_cluster/test_executor_groups.py
index e0f945e1d..da24e6f87 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -587,7 +587,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # and mem_admitted is used for this. Since mem_limit is being used here, 
both will be
     # identical but this will at least test that code path as a sanity check.
     second_coord_client.clear_configuration()
-    second_coord_client.set_configuration({'mem_limit': '4g'})
+    # The maximum memory can be used for query needs to subtract the codegen 
cache
+    # capacity, which is 4GB - 10% * 4GB = 3.6GB.
+    query_mem_limit = 4 * (1 - 0.1)
+    second_coord_client.set_configuration({'mem_limit': str(query_mem_limit) + 
'g'})
     handle_for_second = second_coord_client.execute_async(QUERY)
     # Verify that the first coordinator knows about the query running on the 
second
     self.coordinator.service.wait_for_metric_value(
diff --git a/tests/custom_cluster/test_jvm_mem_tracking.py 
b/tests/custom_cluster/test_jvm_mem_tracking.py
index c93af86e1..e4b9a07e0 100644
--- a/tests/custom_cluster/test_jvm_mem_tracking.py
+++ b/tests/custom_cluster/test_jvm_mem_tracking.py
@@ -37,7 +37,8 @@ class TestJvmMemTracker(CustomClusterTestSuite):
       pytest.skip('runs only in exhaustive')
     super(TestJvmMemTracker, cls).setup_class()
 
-  
@CustomClusterTestSuite.with_args(impalad_args="--mem_limit_includes_jvm=true",
+  
@CustomClusterTestSuite.with_args(impalad_args="--mem_limit_includes_jvm=true \
+                                    --codegen_cache_capacity=0",
                                     start_args="--jvm_args=-Xmx1g", 
cluster_size=1)
   def test_jvm_mem_tracking(self, vector):
     service = ImpalaCluster.get_e2e_test_cluster().impalads[0].service

Reply via email to