This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bdb99938 IMPALA-11470: Add Cache For Codegen Functions
4bdb99938 is described below

commit 4bdb99938a785966405fc1cd61dee1597c43f19a
Author: Yida Wu <[email protected]>
AuthorDate: Wed Oct 26 16:38:00 2022 -0700

    IMPALA-11470: Add Cache For Codegen Functions
    
    The patch adds supports of the cache for CodeGen functions
    to improve the performance of sub-second queries.
    
    The main idea is to store the codegen functions to a cache,
    and reuse them when it is appropriate to avoid repeated llvm
    optimization time which could take over hundreds of milliseconds.
    
    In this patch, we implement a cache to store codegen functions.
    The cache is a singleton instance for each daemon, and contains
    multiple cache entries. Each cache entry is at the fragment
    level, that is storing all the codegen functions of a fragment
    in a cache entry, if one exactly same fragment comes again, it
    should be able to find all the codegen functions it needs
    from the specific cache entry, therefore saving the time.
    
    The module bitcode is used as the key to the cache, which will
    be generated before the module optimization and final
    compilation. If codegen_cache_mode is NORMAL, which is by default,
    we will store the full bitcode string as the key. Otherwise, if
    codegen_cache_mode is set to OPTIMAL, we will store a key only
    containing the hash code and the total length of a full key to
    reduce memory consumption.
    
    Also, KrpcDataStreamSenderConfig::CodegenHashRow() is changed to
    pass the hash seed as an argument because it can't hit the cache
    for the fragment if using a dynamic hash seed within the codegen
    function.
    
    Codegen cache is disabled automatically for a fragment using a
    native udf, because it can lead to a crash in this case. The reason
    for that is the udf is loaded to the llvm execution engine global
    mapping instead of the llvm module, however, the current key to the
    cache entry uses the llvm module bitcode which can't reflect the
    change of the udf address if the udf is reloaded during runtime,
    for example database recreation, then it could lead to a crash due
    to using an old udf address from the cache. Disable it until there
    is a better solution, filed IMPALA-11771 to follow.
    
    The patch also introduces following new flags for start and query
    options for feature configuration and operation purpose.
    Start option for configuration:
      - codegen_cache_capacity: The capacity of the cache, if set to 0,
        codegen cache is disabled.
    
    Query option for operations:
      - disable_codegen_cache: Codegen cache will be disabled when it
        is set to true.
    
      - codegen_cache_mode: It is defined by a new enum type
        TCodeGenCacheMode. There are four types, NORMAL and OPTIMAL,
        and two other types, NORMAL_DEBUG and OPTIMAL_DEBUG, which are
        the debug mode of the first two types.
        If using NORMAL, a full key will be stored to the cache, it will
        cost more memory for each entry because the key is the bitcode
        of the llvm module, it can be large.
        If using OPTIMAL, the cache will only store the hash code and
        length of the key, it reduces the memory consumption largely,
        however, could be possible to have collision issues.
        If using debug modes, the behavior would be the same as the
        non-debug modes, but more logs or statistics will be allowed,
        that means could be slower.
        Only valid when disable_codegen_cache is set to false.
    
    New impalad metrics:
      - impala.codegen-cache.misses
      - impala.codegen-cache.entries-in-use
      - impala.codegen-cache.entries-in-use-bytes
      - impala.codegen-cache.entries-evicted
      - impala.codegen-cache.hits
      - impala.codegen-cache.entry-sizes
    
    New profile Metrics:
      - CodegenCacheLookupTime
      - CodegenCacheSaveTime
      - ModuleBitcodeGenTime
      - NumCachedFunctions
    
    TPCH-1 performance evaluation (8 iteration) on AWS m5a.4xlarge,
    the result removes the first iteration to show the benefit of the
    cache:
    Query     Cached(s) NoCache(s) Delta(Avg) NoCodegen(s)  Delta(Avg)
    TPCH-Q1    0.39      1.02       -61.76%     5.59         -93.02%
    TPCH-Q2    0.56      1.21       -53.72%     0.47         19.15%
    TPCH-Q3    0.37      0.77       -51.95%     0.43         -13.95%
    TPCH-Q4    0.36      0.51       -29.41%     0.33         9.09%
    TPCH-Q5    0.39      1.1        -64.55%     0.39         0%
    TPCH-Q6    0.24      0.27       -11.11%     0.77         -68.83%
    TPCH-Q7    0.39      1.2        -67.5%      0.39         0%
    TPCH-Q8    0.58      1.46       -60.27%     0.45         28.89%
    TPCH-Q9    0.8       1.38       -42.03%     1            -20%
    TPCH-Q10   0.6       1.03       -41.75%     0.85         -29.41%
    TPCH-Q11   0.3       0.93       -67.74%     0.2          50%
    TPCH-Q12   0.28      0.48       -41.67%     0.38         -26.32%
    TPCH-Q13   1.11      1.22       -9.02%      1.16         -4.31%
    TPCH-Q14   0.55      0.78       -29.49%     0.45         22.22%
    TPCH-Q15   0.33      0.73       -54.79%     0.44         -25%
    TPCH-Q16   0.32      0.78       -58.97%     0.41         -21.95%
    TPCH-Q17   0.56      0.84       -33.33%     0.89         -37.08%
    TPCH-Q18   0.54      0.92       -41.3%      0.89         -39.33%
    TPCH-Q19   0.35      2.34       -85.04%     0.35         0%
    TPCH-Q20   0.34      0.98       -65.31%     0.31         9.68%
    TPCH-Q21   0.83      1.14       -27.19%     0.86         -3.49%
    TPCH-Q22   0.26      0.52       -50%        0.25         4%
    
    From the result, it shows a pretty good performance compared to
    codegen without cache (default setting). However, compared
    to codegen disabled, as expected, for short queries, codegen
    cache is not always faster, probably because for the codegen
    cache, it still needs some time to prepare the codegen functions
    and generate an appropriate module bitcode to be the key, if
    the time of the preparation is larger than the benefit from
    the codegen functions, especially for the extremely short queries,
    the result can be slower than not using the codegen. There could
    be room to improve in future.
    
    We also test the total cache entry size for tpch queries. The data
    below shows the total codegen cache used by each tpch query. We
    can see the optimal mode is very helpful to reduce the size of
    the cache, and the reason is the much smaller key in optimal mode
    we mentioned before because the only difference between two modes
    is the key.
    
    Query     Normal(KB)  Optimal(KB)
    TPCH-Q1     604.1       50.9
    TPCH-Q2     973.4       135.5
    TPCH-Q3     561.1       36.5
    TPCH-Q4     423.3       41.1
    TPCH-Q5     866.9       93.3
    TPCH-Q6     295.9       4.9
    TPCH-Q7     1105.4      124.5
    TPCH-Q8     1382.6      211
    TPCH-Q9     1041.4      119.5
    TPCH-Q10    738.4       65.4
    TPCH-Q11    1201.6      136.3
    TPCH-Q12    452.8       46.7
    TPCH-Q13    541.3       48.1
    TPCH-Q14    696.8       102.8
    TPCH-Q15    1148.1      95.2
    TPCH-Q16    740.6       77.4
    TPCH-Q17    990.1       133.4
    TPCH-Q18    376         70.8
    TPCH-Q19    1280.1      179.5
    TPCH-Q20    1260.9      180.7
    TPCH-Q21    722.5       66.8
    TPCH-Q22    713.1       49.8
    
    Tests:
    Ran exhaustive tests.
    Added E2e testcase TestCodegenCache.
    Added unit testcase LlvmCodeGenCacheTest.
    
    Change-Id: If42c78a7f51fd582e5fe331fead494dadf544eb1
    Reviewed-on: http://gerrit.cloudera.org:8080/19181
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/codegen/CMakeLists.txt                      |   2 +
 be/src/codegen/llvm-codegen-cache-test.cc          | 586 +++++++++++++++++++++
 be/src/codegen/llvm-codegen-cache.cc               | 235 +++++++++
 be/src/codegen/llvm-codegen-cache.h                | 298 +++++++++++
 be/src/codegen/llvm-codegen.cc                     | 241 +++++++--
 be/src/codegen/llvm-codegen.h                      |  48 +-
 be/src/exprs/scalar-expr.cc                        |  22 +-
 be/src/runtime/exec-env.cc                         |  26 +
 be/src/runtime/exec-env.h                          |   6 +
 be/src/runtime/fragment-state.h                    |   4 +
 be/src/runtime/krpc-data-stream-sender-ir.cc       |   2 +-
 be/src/runtime/krpc-data-stream-sender.cc          |  18 +-
 be/src/runtime/krpc-data-stream-sender.h           |   2 +-
 be/src/runtime/query-state.cc                      |   6 +
 be/src/runtime/query-state.h                       |   5 +
 be/src/runtime/test-env.h                          |   9 +-
 be/src/service/query-options.cc                    |  11 +
 be/src/service/query-options.h                     |   9 +-
 common/thrift/ImpalaService.thrift                 |  12 +
 common/thrift/Query.thrift                         |  13 +
 common/thrift/metrics.json                         |  60 +++
 .../queries/QueryTest/codegen-cache-udf.test       |  24 +
 tests/common/test_result_verifier.py               |   8 +
 tests/custom_cluster/test_codegen_cache.py         | 254 +++++++++
 24 files changed, 1845 insertions(+), 56 deletions(-)

diff --git a/be/src/codegen/CMakeLists.txt b/be/src/codegen/CMakeLists.txt
index ac00a1c2b..6a01f39de 100644
--- a/be/src/codegen/CMakeLists.txt
+++ b/be/src/codegen/CMakeLists.txt
@@ -32,6 +32,7 @@ add_library(CodeGen
   codegen-symbol-emitter.cc
   codegen-util.cc
   llvm-codegen.cc
+  llvm-codegen-cache.cc
   instruction-counter.cc
   ${IR_C_FILE}
   ${LEGACY_AVX_IR_C_FILE}
@@ -130,5 +131,6 @@ add_custom_target(test-loop.bc
 # Exception to unified be tests: custom main initializes LLVM
 ADD_BE_LSAN_TEST(llvm-codegen-test)
 add_dependencies(llvm-codegen-test test-loop.bc)
+ADD_BE_LSAN_TEST(llvm-codegen-cache-test LlvmCodegenCacheTest.*)
 
 ADD_UNIFIED_BE_LSAN_TEST(instruction-counter-test InstructionCounterTest.*)
diff --git a/be/src/codegen/llvm-codegen-cache-test.cc 
b/be/src/codegen/llvm-codegen-cache-test.cc
new file mode 100644
index 000000000..1146c3564
--- /dev/null
+++ b/be/src/codegen/llvm-codegen-cache-test.cc
@@ -0,0 +1,586 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "codegen/llvm-codegen-cache.h"
+#include <boost/thread/thread.hpp>
+#include <llvm/ExecutionEngine/ExecutionEngine.h>
+#include "codegen/mcjit-mem-mgr.h"
+#include "common/object-pool.h"
+#include "runtime/fragment-state.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
+
+using namespace std;
+using boost::scoped_ptr;
+using boost::thread;
+using boost::thread_group;
+
+DECLARE_bool(cache_force_single_shard);
+DECLARE_string(codegen_cache_capacity);
+
+namespace impala {
+class LlvmCodeGenCacheTest : public testing::Test {
+ public:
+  virtual void SetUp() {
+    FLAGS_codegen_cache_capacity = "0";
+    metrics_.reset(new MetricGroup("codegen-cache-test"));
+    profile_ = RuntimeProfile::Create(&obj_pool_, "codegen-cache-test");
+    test_env_.reset(new TestEnv);
+    ASSERT_OK(test_env_->Init());
+    RuntimeState* runtime_state_;
+    ASSERT_OK(test_env_->CreateQueryState(0, &query_options_, 
&runtime_state_));
+    QueryState* qs = runtime_state_->query_state();
+    TPlanFragment* fragment = qs->obj_pool()->Add(new TPlanFragment());
+    PlanFragmentCtxPB* fragment_ctx = qs->obj_pool()->Add(new 
PlanFragmentCtxPB());
+    fragment_state_ =
+        qs->obj_pool()->Add(new FragmentState(qs, *fragment, *fragment_ctx));
+  }
+
+  virtual void TearDown() {
+    fragment_state_->ReleaseResources();
+    fragment_state_ = nullptr;
+    codegen_cache_.reset();
+    test_env_.reset();
+    metrics_.reset();
+    obj_pool_.Clear();
+  }
+
+  void Reset() {
+    TearDown();
+    SetUp();
+  }
+
+  static void AddFunctionToJit(
+      LlvmCodeGen* codegen, llvm::Function* fn, CodegenFnPtrBase* fn_ptr) {
+    return codegen->AddFunctionToJitInternal(fn, fn_ptr);
+  }
+
+  static Status FinalizeModule(LlvmCodeGen* codegen) { return 
codegen->FinalizeModule(); }
+
+  static void CheckResult(CodeGenCacheEntry& entry, bool is_double = false) {
+    ASSERT_TRUE(!entry.Empty());
+    ASSERT_TRUE(entry.engine_pointer != nullptr);
+    CheckResult(entry.engine_pointer, is_double);
+  }
+
+  static void CheckResult(LlvmCodeGen* codegen, bool is_double = false) {
+    ASSERT_TRUE(codegen->execution_engine_cached_ != nullptr);
+    CheckResult(codegen->execution_engine_cached_.get(), is_double);
+  }
+
+  static void CheckResult(llvm::ExecutionEngine* engine, bool is_double = 
false) {
+    void* test_fn;
+    if (!is_double) {
+      test_fn = reinterpret_cast<void*>(engine->getFunctionAddress("Echo"));
+    } else {
+      test_fn = reinterpret_cast<void*>(engine->getFunctionAddress("Double"));
+    }
+    ASSERT_TRUE(test_fn != nullptr);
+    int input = 1;
+    if (!is_double) {
+      EXPECT_EQ(((TestEcho)test_fn)(input), input);
+    } else {
+      EXPECT_EQ(((TestDouble)test_fn)(input), input * 2);
+    }
+  }
+
+  void AddLlvmCodegenEcho(LlvmCodeGen* codegen);
+  void AddLlvmCodegenDouble(LlvmCodeGen* codegen);
+  void GetLlvmEmptyFunction(LlvmCodeGen* codegen, llvm::Function**);
+  typedef int (*TestEcho)(int);
+  typedef int (*TestDouble)(int);
+  typedef void (*TestEmpty)();
+  void TestBasicFunction(TCodeGenCacheMode::type mode);
+  void TestAtCapacity(TCodeGenCacheMode::type mode);
+  void TestSkipCache();
+  void CheckMetrics(CodeGenCache*, int, int, int);
+  void CheckInUseMetrics(CodeGenCache*, int, int64_t);
+  void CheckEvictMetrics(CodeGenCache*, int);
+  int64_t GetMemCharge(LlvmCodeGen* codegen, string key_str, bool 
is_normal_mode);
+  void CheckEngineCount(LlvmCodeGen*, int expect_count);
+  void CheckToInsertMap();
+  void TestSwitchModeHelper(TCodeGenCacheMode::type mode, string key,
+      int expect_entry_num, int expect_engine_num, llvm::ExecutionEngine** 
engine);
+  bool CheckKeyExist(TCodeGenCacheMode::type mode, string key);
+  bool CheckEngineExist(llvm::ExecutionEngine* engine);
+  void ExpectNumEngineSameAsEntry();
+  void StoreHelper(TCodeGenCacheMode::type mode, string key);
+  void TestConcurrentStore(int num_threads);
+
+  vector<TCodeGenCacheMode::type> all_modes = {TCodeGenCacheMode::OPTIMAL,
+      TCodeGenCacheMode::NORMAL, TCodeGenCacheMode::OPTIMAL_DEBUG,
+      TCodeGenCacheMode::NORMAL_DEBUG};
+
+  FragmentState* fragment_state_;
+  ObjectPool obj_pool_;
+  scoped_ptr<MetricGroup> metrics_;
+  RuntimeProfile* profile_;
+  scoped_ptr<TestEnv> test_env_;
+  scoped_ptr<CodeGenCache> codegen_cache_;
+  shared_ptr<llvm::ExecutionEngine> exec_engine_;
+  TQueryOptions query_options_;
+};
+
+void LlvmCodeGenCacheTest::AddLlvmCodegenEcho(LlvmCodeGen* codegen) {
+  ASSERT_TRUE(codegen != NULL);
+  LlvmCodeGen::FnPrototype prototype(codegen, "Echo", codegen->i32_type());
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("n", codegen->i32_type()));
+  LlvmBuilder builder(codegen->context());
+  llvm::Value* args[1];
+  llvm::Function* fn = prototype.GeneratePrototype(&builder, args);
+  builder.CreateRet(args[0]);
+  fn = codegen->FinalizeFunction(fn);
+  ASSERT_TRUE(fn != NULL);
+  CodegenFnPtr<TestEcho> jitted_fn;
+  AddFunctionToJit(codegen, fn, &jitted_fn);
+  ASSERT_OK(FinalizeModule(codegen));
+  ASSERT_TRUE(jitted_fn.load() != nullptr);
+  TestEcho test_fn = jitted_fn.load();
+  ASSERT_EQ(test_fn(1), 1);
+}
+
+void LlvmCodeGenCacheTest::GetLlvmEmptyFunction(
+    LlvmCodeGen* codegen, llvm::Function** func) {
+  ASSERT_TRUE(codegen != NULL);
+  LlvmCodeGen::FnPrototype prototype(codegen, "TestEmpty", 
codegen->void_type());
+  LlvmBuilder builder(codegen->context());
+  llvm::Function* fn = prototype.GeneratePrototype(&builder, nullptr);
+  builder.CreateRetVoid();
+  fn = codegen->FinalizeFunction(fn);
+  ASSERT_TRUE(fn != NULL);
+  *func = fn;
+}
+
+void LlvmCodeGenCacheTest::AddLlvmCodegenDouble(LlvmCodeGen* codegen) {
+  ASSERT_TRUE(codegen != NULL);
+  LlvmCodeGen::FnPrototype prototype(codegen, "Double", codegen->i32_type());
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("n", codegen->i32_type()));
+  LlvmBuilder builder(codegen->context());
+  llvm::Value* args[1];
+  llvm::Function* fn = prototype.GeneratePrototype(&builder, args);
+  llvm::Value* mul = codegen->GetI32Constant(2);
+  args[0] = builder.CreateMul(args[0], mul);
+  builder.CreateRet(args[0]);
+  fn = codegen->FinalizeFunction(fn);
+  ASSERT_TRUE(fn != NULL);
+  CodegenFnPtr<TestDouble> jitted_fn;
+  AddFunctionToJit(codegen, fn, &jitted_fn);
+  ASSERT_OK(FinalizeModule(codegen));
+  ASSERT_TRUE(jitted_fn.load() != nullptr);
+  TestEcho test_fn = jitted_fn.load();
+  ASSERT_EQ(test_fn(1), 2);
+}
+
+/// Test the basic function of a codegen cache.
+void LlvmCodeGenCacheTest::TestBasicFunction(TCodeGenCacheMode::type mode) {
+  int64_t codegen_cache_capacity = 256 * 1024; // 256KB
+  codegen_cache_.reset(new CodeGenCache(metrics_.get()));
+  bool is_normal_mode = !CodeGenCacheModeAnalyzer::is_optimal(mode);
+  EXPECT_OK(codegen_cache_->Init(codegen_cache_capacity));
+
+  // Create a LlvmCodeGen containing a codegen function Echo.
+  scoped_ptr<LlvmCodeGen> codegen;
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", 
&codegen));
+  AddLlvmCodegenEcho(codegen.get());
+  // Create a LlvmCodeGen containing a codegen function Double.
+  scoped_ptr<LlvmCodeGen> codegen_double;
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(
+      fragment_state_, NULL, "test_double", &codegen_double));
+  AddLlvmCodegenDouble(codegen_double.get());
+
+  CodeGenCacheKey cache_key;
+  CodeGenCacheEntry entry;
+  string key = "key";
+  CodeGenCacheKeyConstructor::construct(key, &cache_key);
+  int64_t mem_charge = GetMemCharge(codegen.get(), cache_key.data(), 
is_normal_mode);
+  int64_t mem_charge_double =
+      GetMemCharge(codegen_double.get(), cache_key.data(), is_normal_mode);
+
+  // Store and lookup the entry by the key.
+  EXPECT_OK(codegen_cache_->Store(cache_key, codegen.get(), mode));
+  CheckInUseMetrics(
+      codegen_cache_.get(), 1 /*num_entry_in_use*/, mem_charge 
/*bytes_in_use*/);
+  EXPECT_OK(codegen_cache_->Lookup(cache_key, mode, &entry, &exec_engine_));
+  CheckResult(entry);
+  codegen->Close();
+  // Close the LlvmCodeGen, but should not affect the stored cache.
+  EXPECT_OK(codegen_cache_->Lookup(cache_key, mode, &entry, &exec_engine_));
+  CheckResult(entry);
+  // Override the entry with a different function, should be able to find the 
new
+  // function from the new entry.
+  EXPECT_OK(codegen_cache_->Store(cache_key, codegen_double.get(), mode));
+  CheckInUseMetrics(
+      codegen_cache_.get(), 1 /*num_entry_in_use*/, mem_charge_double 
/*bytes_in_use*/);
+  EXPECT_OK(codegen_cache_->Lookup(cache_key, mode, &entry, &exec_engine_));
+  CheckResult(entry, true /*is_double*/);
+  EXPECT_EQ(codegen_cache_->codegen_cache_entries_evicted_->GetValue(), 1);
+  codegen_double->Close();
+}
+
+void LlvmCodeGenCacheTest::CheckMetrics(
+    CodeGenCache* codegen_cache, int hit, int miss, int evict) {
+  EXPECT_EQ(codegen_cache->codegen_cache_hits_->GetValue(), hit);
+  EXPECT_EQ(codegen_cache->codegen_cache_misses_->GetValue(), miss);
+  EXPECT_EQ(codegen_cache->codegen_cache_entries_evicted_->GetValue(), evict);
+}
+
+void LlvmCodeGenCacheTest::CheckInUseMetrics(
+    CodeGenCache* codegen_cache, int num_entry, int64_t bytes = -1) {
+  EXPECT_EQ(codegen_cache->codegen_cache_entries_in_use_->GetValue(), 
num_entry);
+  if (bytes != -1) {
+    EXPECT_EQ(codegen_cache->codegen_cache_entries_in_use_bytes_->GetValue(), 
bytes);
+  }
+}
+
+void LlvmCodeGenCacheTest::CheckEvictMetrics(CodeGenCache* codegen_cache, int 
evict) {
+  EXPECT_EQ(codegen_cache->codegen_cache_entries_evicted_->GetValue(), evict);
+}
+
+int64_t LlvmCodeGenCacheTest::GetMemCharge(
+    LlvmCodeGen* codegen, string key_str, bool is_normal_mode) {
+  if (is_normal_mode) {
+    return codegen->memory_manager_->bytes_allocated() + key_str.size()
+        + sizeof(CodeGenCacheEntry);
+  }
+  // Optimal mode would use hash code and length as the key.
+  return codegen->memory_manager_->bytes_allocated() + 
CodeGenCacheKey::OptimalKeySize
+      + sizeof(CodeGenCacheEntry);
+}
+
+/// Test the situation that the codegen cache hits the limit of capacity, in 
this case,
+/// eviction is needed when new insertion comes.
+void LlvmCodeGenCacheTest::TestAtCapacity(TCodeGenCacheMode::type mode) {
+  int64_t codegen_cache_capacity = 196; // 196B
+  bool is_normal_mode = !CodeGenCacheModeAnalyzer::is_optimal(mode);
+  // 128B for optimal mode
+  if (!is_normal_mode) codegen_cache_capacity = 128;
+  // Using single shard makes the logic of scenarios simple for capacity and
+  // eviction-related behavior.
+  FLAGS_cache_force_single_shard = true;
+
+  // Create two LlvmCodeGen objects containing a different codegen function 
separately.
+  scoped_ptr<LlvmCodeGen> codegen;
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", 
&codegen));
+  AddLlvmCodegenEcho(codegen.get());
+  codegen->GenerateFunctionNamesHashCode();
+  scoped_ptr<LlvmCodeGen> codegen_double;
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(
+      fragment_state_, NULL, "test_double", &codegen_double));
+  AddLlvmCodegenDouble(codegen_double.get());
+  codegen_double->GenerateFunctionNamesHashCode();
+
+  CodeGenCacheKey cache_key_1;
+  CodeGenCacheKey cache_key_2;
+  string key_1 = "key1";
+  string key_2 = "key2";
+  CodeGenCacheKeyConstructor::construct(key_1, &cache_key_1);
+  CodeGenCacheKeyConstructor::construct(key_2, &cache_key_2);
+  int64_t mem_charge_1 = GetMemCharge(codegen.get(), cache_key_1.data(), 
is_normal_mode);
+  int64_t mem_charge_2 = GetMemCharge(codegen.get(), cache_key_2.data(), 
is_normal_mode);
+  // Make sure the memory charge of two keys is larger than capacity for 
testing the
+  // eviction.
+  ASSERT_LE(mem_charge_1, codegen_cache_capacity);
+  ASSERT_LE(mem_charge_2, codegen_cache_capacity);
+  ASSERT_GE(mem_charge_1 + mem_charge_2, codegen_cache_capacity);
+
+  test_env_->ResetCodegenCache(metrics_.get());
+  CodeGenCache* cache = test_env_->codegen_cache();
+  EXPECT_OK(cache->Init(codegen_cache_capacity));
+
+  // Store key_1 and lookup.
+  EXPECT_OK(codegen->StoreCache(cache_key_1));
+  EXPECT_TRUE(codegen->LookupCache(cache_key_1));
+  CheckResult(codegen.get());
+  CheckMetrics(cache, 1 /*hit*/, 0 /*miss*/, 0 /*evict*/);
+
+  // Store key_2, key_1 should be evicted due to hitting the capaticy limit.
+  EXPECT_OK(codegen_double->StoreCache(cache_key_2));
+  CheckMetrics(cache, 1 /*hit*/, 0 /*miss*/, 1 /*evict*/);
+
+  // Lookup key_1, should be gone. Lookup key_2, should be successful.
+  EXPECT_FALSE(codegen->LookupCache(cache_key_1));
+  CheckMetrics(cache, 1 /*hit*/, 1 /*miss*/, 1 /*evict*/);
+  EXPECT_TRUE(codegen_double->LookupCache(cache_key_2));
+  CheckResult(codegen_double.get(), /*is_double*/ true);
+  CheckMetrics(cache, 2 /*hit*/, 1 /*miss*/, 1 /*evict*/);
+
+  // Store key_1 again, should evict the key_2, check again to see if 
everything
+  // is alright.
+  EXPECT_OK(codegen->StoreCache(cache_key_1));
+  CheckMetrics(cache, 2 /*hit*/, 1 /*miss*/, 2 /*evict*/);
+  EXPECT_FALSE(codegen_double->LookupCache(cache_key_2));
+  CheckMetrics(cache, 2 /*hit*/, 2 /*miss*/, 2 /*evict*/);
+  EXPECT_TRUE(codegen->LookupCache(cache_key_1));
+  CheckResult(codegen.get());
+  CheckMetrics(cache, 3 /*hit*/, 2 /*miss*/, 2 /*evict*/);
+
+  codegen->Close();
+  codegen_double->Close();
+}
+
+/// Test the case if we have a cache but doesn't contain the function
+/// we want, should switch to the cache missing path.
+void LlvmCodeGenCacheTest::TestSkipCache() {
+  // Initial a LlvmCodeGen object with a normal function.
+  scoped_ptr<LlvmCodeGen> codegen;
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", 
&codegen));
+  AddLlvmCodegenEcho(codegen.get());
+  // Create an empty function from other LlvmCodeGen to create the failure 
later.
+  scoped_ptr<LlvmCodeGen> codegen_empty;
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(
+      fragment_state_, NULL, "test_empty", &codegen_empty));
+  llvm::Function* empty_func;
+  GetLlvmEmptyFunction(codegen_empty.get(), &empty_func);
+
+  test_env_->ResetCodegenCache(metrics_.get());
+  EXPECT_OK(test_env_->codegen_cache()->Init(256 * 1024 /*capacity*/));
+
+  CheckMetrics(test_env_->codegen_cache(), 0 /*hit*/, 0 /*miss*/, 0 /*evict*/);
+  CodeGenCacheKey cache_key;
+  string key = "key";
+  CodeGenCacheKeyConstructor::construct(key, &cache_key);
+  codegen->GenerateFunctionNamesHashCode();
+  // Store and lookup the entry by the key, should be successful.
+  EXPECT_OK(codegen->StoreCache(cache_key));
+  EXPECT_TRUE(codegen->LookupCache(cache_key));
+  CheckMetrics(test_env_->codegen_cache(), 1 /*hit*/, 0 /*miss*/, 0 /*evict*/);
+  CodegenFnPtr<TestEmpty> fn_ptr;
+  // Insert a new function to the codegen, and regenerate the function names 
hash
+  // code, expect a failure because the hash code inconsistency with the code 
in
+  // the cache.
+  codegen->fns_to_jit_compile_.emplace_back(empty_func, &fn_ptr);
+  codegen->GenerateFunctionNamesHashCode();
+  // Expect a look up failure.
+  EXPECT_FALSE(codegen->LookupCache(cache_key));
+  CheckMetrics(test_env_->codegen_cache(), 1 /*hit*/, 1 /*miss*/, 0 /*evict*/);
+  codegen->Close();
+  codegen_empty->Close();
+}
+
+// Test the basic function of using the codegen cache.
+TEST_F(LlvmCodeGenCacheTest, BasicFunction) {
+  for (auto mode : all_modes) {
+    Reset();
+    TestBasicFunction(mode);
+  }
+}
+
+// Test when the codegen cache is at capacity.
+TEST_F(LlvmCodeGenCacheTest, EvictionAtCapacity) {
+  for (auto mode : all_modes) {
+    query_options_.codegen_cache_mode = mode;
+    Reset();
+    TestAtCapacity(query_options_.codegen_cache_mode);
+  }
+}
+
+// Test when the cache hits, but has different function names, in that case,
+// we will skip the cache and fall back to normal path.
+TEST_F(LlvmCodeGenCacheTest, SkipCache) {
+  for (auto mode : all_modes) {
+    query_options_.codegen_cache_mode = mode;
+    Reset();
+    TestSkipCache();
+  }
+}
+
+// Test whether the codegen cache mode analyzer produces the correct result 
for all
+// the modes.
+TEST_F(LlvmCodeGenCacheTest, ModeAnalyzer) {
+  EXPECT_FALSE(CodeGenCacheModeAnalyzer::is_debug(TCodeGenCacheMode::OPTIMAL));
+  EXPECT_FALSE(CodeGenCacheModeAnalyzer::is_debug(TCodeGenCacheMode::NORMAL));
+  
EXPECT_TRUE(CodeGenCacheModeAnalyzer::is_debug(TCodeGenCacheMode::OPTIMAL_DEBUG));
+  
EXPECT_TRUE(CodeGenCacheModeAnalyzer::is_debug(TCodeGenCacheMode::NORMAL_DEBUG));
+  
EXPECT_TRUE(CodeGenCacheModeAnalyzer::is_optimal(TCodeGenCacheMode::OPTIMAL));
+  
EXPECT_FALSE(CodeGenCacheModeAnalyzer::is_optimal(TCodeGenCacheMode::NORMAL));
+  
EXPECT_TRUE(CodeGenCacheModeAnalyzer::is_optimal(TCodeGenCacheMode::OPTIMAL_DEBUG));
+  
EXPECT_FALSE(CodeGenCacheModeAnalyzer::is_optimal(TCodeGenCacheMode::NORMAL_DEBUG));
+}
+
+// Check the number of execution engine stored in the cache.
+// Because the shared pointer of the execution engine needs to be stored in 
the codegen
+// cache while the entry using the execution engine is stored in the cache.
+void LlvmCodeGenCacheTest::CheckEngineCount(LlvmCodeGen* codegen, int 
expect_count) {
+  lock_guard<mutex> lock(codegen_cache_->cached_engines_lock_);
+  auto engine_it = 
codegen_cache_->cached_engines_.find(codegen->execution_engine_.get());
+  EXPECT_TRUE(engine_it != codegen_cache_->cached_engines_.end());
+  EXPECT_EQ(codegen_cache_->cached_engines_.size(), expect_count);
+}
+
+void LlvmCodeGenCacheTest::CheckToInsertMap() {
+  lock_guard<mutex> lock(codegen_cache_->to_insert_set_lock_);
+  EXPECT_EQ(codegen_cache_->keys_to_insert_.size(), 0);
+}
+
+// Return true if the provided key exists.
+bool LlvmCodeGenCacheTest::CheckKeyExist(TCodeGenCacheMode::type mode, string 
key) {
+  CodeGenCacheKey cache_key;
+  CodeGenCacheEntry entry;
+  CodeGenCacheKeyConstructor::construct(key, &cache_key);
+  EXPECT_OK(codegen_cache_->Lookup(cache_key, mode, &entry, &exec_engine_));
+  return !entry.Empty();
+}
+
+// Return true if the provided engine exists.
+bool LlvmCodeGenCacheTest::CheckEngineExist(llvm::ExecutionEngine* engine) {
+  auto engine_it = codegen_cache_->cached_engines_.find(engine);
+  return engine_it != codegen_cache_->cached_engines_.end();
+}
+
+// Expect the number of execution engine is the same as the entry number in 
the global
+// codegen cache.
+void LlvmCodeGenCacheTest::ExpectNumEngineSameAsEntry() {
+  EXPECT_EQ(codegen_cache_->cached_engines_.size(),
+      codegen_cache_->codegen_cache_entries_in_use_->GetValue());
+}
+
+/// Helper function to test swithing modes. Helps to insert an entry with 
provided key
+/// and mode.
+void LlvmCodeGenCacheTest::TestSwitchModeHelper(TCodeGenCacheMode::type mode, 
string key,
+    int expect_entry_num = -1, int expect_engine_num = -1,
+    llvm::ExecutionEngine** engine = nullptr) {
+  // Create a LlvmCodeGen containing a codegen function Echo.
+  scoped_ptr<LlvmCodeGen> codegen;
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", 
&codegen));
+  AddLlvmCodegenEcho(codegen.get());
+
+  CodeGenCacheKey cache_key;
+  CodeGenCacheEntry entry;
+  CodeGenCacheKeyConstructor::construct(key, &cache_key);
+
+  // Store and lookup the entry by the key.
+  EXPECT_OK(codegen_cache_->Store(cache_key, codegen.get(), mode));
+  if (expect_entry_num != -1) {
+    CheckInUseMetrics(codegen_cache_.get(), expect_entry_num 
/*num_entry_in_use*/);
+  }
+  if (expect_engine_num != -1) {
+    CheckEngineCount(codegen.get(), expect_engine_num);
+  }
+  EXPECT_OK(codegen_cache_->Lookup(cache_key, mode, &entry, &exec_engine_));
+  CheckResult(entry);
+  if (engine) *engine = codegen->execution_engine_.get();
+  codegen->Close();
+}
+
+// Test to switch among different modes.
+TEST_F(LlvmCodeGenCacheTest, SwitchMode) {
+  int64_t codegen_cache_capacity = 512; // 512B
+  codegen_cache_.reset(new CodeGenCache(metrics_.get()));
+  EXPECT_OK(codegen_cache_->Init(codegen_cache_capacity));
+  string key = "key";
+  // Insert one entry to the cache with the key provided in each 
TestSwitchModeHelper().
+  // The key of debug and non-debug are the same for the same mode, therefore,
+  // we expect the entry number and engine number would not be changed between 
the
+  // switch of debug and non-debug modes. But the key would be different 
between
+  // NORMAL and OPTIMAL.
+  TestSwitchModeHelper(TCodeGenCacheMode::OPTIMAL, key, 1, 1);
+  TestSwitchModeHelper(TCodeGenCacheMode::OPTIMAL_DEBUG, key, 1, 1);
+  TestSwitchModeHelper(TCodeGenCacheMode::NORMAL, key, 2, 2);
+  TestSwitchModeHelper(TCodeGenCacheMode::NORMAL_DEBUG, key, 2, 2);
+  // Try again, the new insertion should replace the old ones, so the entry 
number and
+  // engine number won't change.
+  llvm::ExecutionEngine *engine_opt, *engine_opt_dbg, *engine_normal, 
*engine_normal_dbg;
+  TestSwitchModeHelper(TCodeGenCacheMode::OPTIMAL, key, 2, 2, &engine_opt);
+  TestSwitchModeHelper(TCodeGenCacheMode::OPTIMAL_DEBUG, key, 2, 2, 
&engine_opt_dbg);
+  // Expect the engines with the same key should be different, because the 
engine is
+  // created every time.
+  EXPECT_NE(engine_opt, engine_opt_dbg);
+  // Search the engine, the later one should exist, while the early one not.
+  EXPECT_FALSE(CheckEngineExist(engine_opt));
+  EXPECT_TRUE(CheckEngineExist(engine_opt_dbg));
+
+  // Same as above, but use the NORMAL type.
+  TestSwitchModeHelper(TCodeGenCacheMode::NORMAL, key, 2, 2, &engine_normal);
+  TestSwitchModeHelper(TCodeGenCacheMode::NORMAL_DEBUG, key, 2, 2, 
&engine_normal_dbg);
+  EXPECT_NE(engine_normal, engine_normal_dbg);
+  EXPECT_FALSE(CheckEngineExist(engine_normal));
+  EXPECT_TRUE(CheckEngineExist(engine_normal_dbg));
+
+  // Expect the two existing engines are not the same.
+  EXPECT_NE(engine_opt_dbg, engine_normal_dbg);
+  // Expect the number of existing engines are the same as the entries.
+  ExpectNumEngineSameAsEntry();
+  // Insert lots of different keys, so that to evict the original key due to 
reaching
+  // capacity.
+  for (int i = 0; i < 10; i++) {
+    TestSwitchModeHelper(TCodeGenCacheMode::NORMAL, key + std::to_string(i));
+  }
+  // As the entries with originla key are evicted, expect we can't find them 
in the
+  // codegen cache anymore.
+  EXPECT_FALSE(CheckKeyExist(TCodeGenCacheMode::OPTIMAL, key));
+  EXPECT_FALSE(CheckKeyExist(TCodeGenCacheMode::OPTIMAL_DEBUG, key));
+  EXPECT_FALSE(CheckKeyExist(TCodeGenCacheMode::NORMAL, key));
+  EXPECT_FALSE(CheckKeyExist(TCodeGenCacheMode::NORMAL_DEBUG, key));
+  ExpectNumEngineSameAsEntry();
+}
+
+/// Helper function to store a specific key to the global codegen cache.
+void LlvmCodeGenCacheTest::StoreHelper(TCodeGenCacheMode::type mode, string 
key) {
+  // Create a LlvmCodeGen containing a codegen function Echo.
+  scoped_ptr<LlvmCodeGen> codegen;
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", 
&codegen));
+  AddLlvmCodegenEcho(codegen.get());
+  CodeGenCacheKey cache_key;
+  CodeGenCacheEntry entry;
+  CodeGenCacheKeyConstructor::construct(key, &cache_key);
+  EXPECT_OK(codegen_cache_->Store(cache_key, codegen.get(), mode));
+  codegen->Close();
+}
+
+/// Concurrently store random entries to the global codegen cache, and check 
if all
+/// the resources alright.
+void LlvmCodeGenCacheTest::TestConcurrentStore(int num_threads) {
+  thread_group workers;
+  for (int i = 0; i < num_threads; ++i) {
+    workers.add_thread(new thread([this, num_threads]() {
+      int test_times = 100;
+      while (test_times-- > 0) {
+        string key = std::to_string(rand() % num_threads);
+        int mode_idx = rand() % all_modes.size();
+        StoreHelper(all_modes[mode_idx], key);
+      }
+    }));
+  }
+  workers.join_all();
+
+  // Check the metrics and number of elements in the global cache to make sure 
there will
+  // be no leaking.
+  EXPECT_LE(codegen_cache_->codegen_cache_entries_in_use_->GetValue(), 
num_threads);
+  EXPECT_GT(codegen_cache_->codegen_cache_entries_evicted_->GetValue(), 0);
+  {
+    lock_guard<mutex> lock(codegen_cache_->cached_engines_lock_);
+    EXPECT_LE(codegen_cache_->cached_engines_.size(), num_threads);
+  }
+  CheckToInsertMap();
+}
+
+TEST_F(LlvmCodeGenCacheTest, ConcurrentStore) {
+  int64_t codegen_cache_capacity = 512; // 512B
+  codegen_cache_.reset(new CodeGenCache(metrics_.get()));
+  EXPECT_OK(codegen_cache_->Init(codegen_cache_capacity));
+  TestConcurrentStore(8);
+}
+
+} // namespace impala
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport(false);
+  ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm());
+  return RUN_ALL_TESTS();
+}
diff --git a/be/src/codegen/llvm-codegen-cache.cc 
b/be/src/codegen/llvm-codegen-cache.cc
new file mode 100644
index 000000000..bf1690699
--- /dev/null
+++ b/be/src/codegen/llvm-codegen-cache.cc
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "codegen/llvm-codegen-cache.h"
+#include "codegen/instruction-counter.h"
+#include "codegen/mcjit-mem-mgr.h"
+#include "util/hash-util.h"
+
+using namespace std;
+using strings::Substitute;
+
+namespace impala {
+
+// Maximum codegen cache entry size for the purposes of histogram sizing in 
stats
+// collection.
+// An entries is expected to be less than 128MB.
+static constexpr int64_t STATS_MAX_CODEGEN_CACHE_ENTRY_SIZE = 128L << 20;
+
+void CodeGenCache::EvictionCallback::EvictedEntry(Slice key, Slice value) {
+  DCHECK(key.data() != nullptr);
+  DCHECK(value.data() != nullptr);
+  // Remove the execution engine of this entry from the global cache.
+  const CodeGenCacheEntry* cache_entry =
+      reinterpret_cast<const CodeGenCacheEntry*>(value.data());
+  DCHECK(cache_entry != nullptr);
+  cache_->RemoveEngine(cache_entry->engine_pointer);
+  LOG(INFO) << "Evict CodeGen Cache, key hash_code=" << 
CodeGenCacheKey::hash_code(key);
+  // For statistics.
+  if (codegen_cache_entries_evicted_ != nullptr) {
+    codegen_cache_entries_evicted_->Increment(1);
+  }
+  if (codegen_cache_entries_in_use_ != nullptr) {
+    codegen_cache_entries_in_use_->Increment(-1);
+  }
+  if (codegen_cache_entries_in_use_bytes_ != nullptr) {
+    
codegen_cache_entries_in_use_bytes_->Increment(-cache_entry->total_bytes_charge);
+  }
+}
+
+void CodeGenCacheKeyConstructor::construct(
+    string& key_content, CodeGenCacheKey* cache_key) {
+  DCHECK(cache_key != nullptr);
+  int length = 0;
+  int len_hashcode = sizeof(CodeGenCacheKey::HashCode);
+  // We use the same type for all the length of elements within the key.
+  int len_general_size = sizeof(CodeGenCacheKey::ElementLengthType);
+  int len_key_content = key_content.size();
+  CodeGenCacheKey::HashCode hashcode;
+  length += len_hashcode; // For hash value
+  length += len_general_size << 1;
+  length += len_key_content;
+  string result;
+  result.reserve(length);
+  result.append(reinterpret_cast<char*>(&hashcode), len_hashcode);
+  result.append(reinterpret_cast<char*>(&length), len_general_size);
+  result.append(reinterpret_cast<char*>(&len_key_content), len_general_size);
+  result.append(key_content.c_str(), len_key_content);
+  DCHECK_GT(length, len_hashcode);
+  MurmurHash3_x64_128(result.c_str() + len_hashcode, length - len_hashcode,
+      CODEGEN_CACHE_HASH_SEED_CONST, hashcode.hash_code);
+  result.replace(0, len_hashcode, reinterpret_cast<char*>(&hashcode), 
len_hashcode);
+  cache_key->set_key(move(result));
+}
+
+CodeGenCache::CodeGenCache(MetricGroup* metrics)
+  : is_closed_(false),
+    codegen_cache_hits_(metrics->AddCounter("impala.codegen-cache.hits", 0)),
+    codegen_cache_misses_(metrics->AddCounter("impala.codegen-cache.misses", 
0)),
+    codegen_cache_entries_evicted_(
+        metrics->AddCounter("impala.codegen-cache.entries-evicted", 0)),
+    codegen_cache_entries_in_use_(
+        metrics->AddGauge("impala.codegen-cache.entries-in-use", 0)),
+    codegen_cache_entries_in_use_bytes_(
+        metrics->AddGauge("impala.codegen-cache.entries-in-use-bytes", 0)),
+    codegen_cache_entry_size_stats_(metrics->RegisterMetric(
+        new 
HistogramMetric(MetricDefs::Get("impala.codegen-cache.entry-sizes"),
+            STATS_MAX_CODEGEN_CACHE_ENTRY_SIZE, 3))),
+    evict_callback_(
+        new CodeGenCache::EvictionCallback(this, 
codegen_cache_entries_evicted_,
+            codegen_cache_entries_in_use_, 
codegen_cache_entries_in_use_bytes_)) {}
+
+Status CodeGenCache::Init(int64_t capacity) {
+  DCHECK(cache_ == nullptr);
+  cache_.reset(NewCache(Cache::EvictionPolicy::LRU, capacity, 
"CodeGen_Cache"));
+  return cache_->Init();
+}
+
+void CodeGenCache::ReleaseResources() {
+  if (cache_ != nullptr) cache_.reset();
+}
+
+Status CodeGenCache::Lookup(const CodeGenCacheKey& cache_key,
+    const TCodeGenCacheMode::type& mode, CodeGenCacheEntry* entry,
+    shared_ptr<llvm::ExecutionEngine>* execution_engine) {
+  DCHECK(!is_closed_);
+  DCHECK(cache_ != nullptr);
+  DCHECK(entry != nullptr);
+  DCHECK(execution_engine != nullptr);
+  // Use hash code and the total length as the key for optimal mode, because 
the whole
+  // key could be very large, using optimal mode could improve the performance 
and save
+  // memory consumption. However, it could lead to a collision, even though 
the chance
+  // is very small, in that case, we may switch to normal mode for the query 
or disable
+  // the codegen cache.
+  Slice key = cache_key.optimal_key_slice();
+  if (!CodeGenCacheModeAnalyzer::is_optimal(mode)) {
+    key = cache_key.data_slice();
+  }
+  Cache::UniqueHandle handle(cache_->Lookup(key));
+  if (handle.get() != nullptr) {
+    const CodeGenCacheEntry* cached_entry =
+        reinterpret_cast<const 
CodeGenCacheEntry*>(cache_->Value(handle).data());
+    // Need to find the shared pointer of the engine from the cache before 
return,
+    // because the shared pointer could be deleted in the eviction process.
+    // If can't find it, treat it as cache missing, because the engine is 
needed
+    // to look for jitted functions.
+    if (LookupEngine(cached_entry->engine_pointer, execution_engine)) {
+      entry->Reset(cached_entry->engine_pointer, cached_entry->num_functions,
+          cached_entry->num_instructions, 
cached_entry->function_names_hashcode,
+          cached_entry->total_bytes_charge);
+      return Status::OK();
+    }
+  }
+  entry->Reset();
+  return Status::OK();
+}
+
+Status CodeGenCache::StoreInternal(const CodeGenCacheKey& cache_key,
+    const LlvmCodeGen* codegen, const TCodeGenCacheMode::type& mode) {
+  // In normal mode, we will store the whole key content to the cache.
+  // Otherwise, in optimal mode, we will only store the hash code and length 
of the key.
+  Slice key = cache_key.optimal_key_slice();
+  if (!CodeGenCacheModeAnalyzer::is_optimal(mode)) {
+    key = cache_key.data_slice();
+  }
+  // Memory charge includes both key and entry size.
+  int64_t mem_charge = codegen->memory_manager_->bytes_allocated() + key.size()
+      + sizeof(CodeGenCacheEntry);
+  Cache::UniquePendingHandle pending_handle(
+      cache_->Allocate(key, sizeof(CodeGenCacheEntry), mem_charge));
+  if (pending_handle == nullptr) {
+    return Status(Substitute("Couldn't allocate handle for codegen cache 
entry,"
+                             " size: '$0'",
+        mem_charge));
+  }
+  CodeGenCacheEntry* cache_entry =
+      
reinterpret_cast<CodeGenCacheEntry*>(cache_->MutableValue(&pending_handle));
+  cache_entry->Reset(codegen->execution_engine_.get(), 
codegen->num_functions_->value(),
+      codegen->num_instructions_->value(), codegen->function_names_hashcode_, 
mem_charge);
+  StoreEngine(codegen);
+  /// It is thread-safe, but could override the existing entry with the same 
key.
+  Cache::UniqueHandle cache_handle =
+      cache_->Insert(move(pending_handle), evict_callback_.get());
+  if (cache_handle == nullptr) {
+    RemoveEngine(codegen->execution_engine_.get());
+    return Status(Substitute("Couldn't insert codegen cache entry,"
+                             " hash code:'$0', size: '$1'",
+        cache_key.hash_code().str(), mem_charge));
+  }
+  codegen_cache_entries_in_use_->Increment(1);
+  codegen_cache_entries_in_use_bytes_->Increment(mem_charge);
+  codegen_cache_entry_size_stats_->Update(mem_charge);
+  return Status::OK();
+}
+
+Status CodeGenCache::Store(const CodeGenCacheKey& cache_key, const 
LlvmCodeGen* codegen,
+    const TCodeGenCacheMode::type& mode) {
+  DCHECK(!is_closed_);
+  DCHECK(cache_ != nullptr);
+  DCHECK(codegen != nullptr);
+  Status status = Status::OK();
+  pair<unordered_set<CodeGenCacheKey::HashCode>::iterator, bool> 
key_to_insert_it;
+  {
+    // The Cache::Insert() suggests the caller to avoid multiple handles for 
the same
+    // key to insert because it can be inefficient to keep the entry from 
being evicted.
+    // So use the keys_to_insert_ set to reduce the chance of multiple 
insertion of the
+    // same cache entry.
+    // Before insertion, we will try to insert the hash code of the key to the 
set, if
+    // succeeds, the thread is allowed to do the insertion, and is responsible 
to erase
+    // the hash code in the set when insertion finishes.
+    // Otherwise, if the thread fails to insert the hash code, that means one 
other thread
+    // is doing the insertion on the same key, in that case, the current 
thread would give
+    // up the task and return with an okay status, because we assume the other 
thread
+    // would get the cache entry in. Even that thread fails, the system won't 
hurt without
+    // only one cache entry, and the cache entry can be inserted again next 
time.
+    lock_guard<mutex> lock(to_insert_set_lock_);
+    key_to_insert_it = keys_to_insert_.insert(cache_key.hash_code());
+    // If hash code exists, return an okay immediately.
+    if (!key_to_insert_it.second) return status;
+  }
+  // Do store the cache entry to the cache.
+  status = StoreInternal(cache_key, codegen, mode);
+  // Remove the hash code of the key from the to_insert_keys set.
+  lock_guard<mutex> lock(to_insert_set_lock_);
+  keys_to_insert_.erase(key_to_insert_it.first);
+  return status;
+}
+
+void CodeGenCache::StoreEngine(const LlvmCodeGen* codegen) {
+  DCHECK(codegen != nullptr);
+  lock_guard<mutex> lock(cached_engines_lock_);
+  cached_engines_.emplace(codegen->execution_engine_.get(), 
codegen->execution_engine_);
+}
+
+bool CodeGenCache::LookupEngine(const llvm::ExecutionEngine* engine,
+    shared_ptr<llvm::ExecutionEngine>* execution_engine) {
+  DCHECK(engine != nullptr);
+  DCHECK(execution_engine != nullptr);
+  lock_guard<mutex> lock(cached_engines_lock_);
+  auto engine_it = cached_engines_.find(engine);
+  if (engine_it == cached_engines_.end()) return false;
+  *execution_engine = engine_it->second;
+  return true;
+}
+
+void CodeGenCache::RemoveEngine(const llvm::ExecutionEngine* engine) {
+  DCHECK(engine != nullptr);
+  lock_guard<mutex> lock(cached_engines_lock_);
+  cached_engines_.erase(engine);
+}
+
+} // namespace impala
diff --git a/be/src/codegen/llvm-codegen-cache.h 
b/be/src/codegen/llvm-codegen-cache.h
new file mode 100644
index 000000000..9242118ba
--- /dev/null
+++ b/be/src/codegen/llvm-codegen-cache.h
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "codegen/llvm-codegen.h"
+
+#include <unistd.h>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "common/status.h"
+#include "kudu/util/slice.h"
+#include "thirdparty/datasketches/MurmurHash3.h"
+#include "util/cache/cache.h"
+#include "util/histogram-metric.h"
+#include "util/metrics.h"
+
+namespace impala {
+
+/// The key to a CodeGen Cache entry.
+/// It contains a hash code at the first of sizeof(struct HashCode) bytes. If 
the
+/// cache is in NORMAL mode, the content of the key will be after the hash 
code,
+/// which could be a combination of several keys, otherwise, in OPTIMAL mode, 
the
+/// CodeGenCacheKey to be stored to the cache would only contain a hash code 
and
+/// the total length to reduce the memory consumption.
+/// For a key in the NORMAL mode, it would be like:
+/// Hash Code| Total Length| Length of Key 1| Data of Key 1| Length of Key 2
+/// | Data of Key 2| ... | Length of Key N| Data of Key N.
+/// The key is constructed by CodeGenCacheKeyConstructor.
+/// Functions are NOT thread-safe.
+class CodeGenCacheKey {
+ public:
+  /// The type of a hashcode for the codegen cache key.
+  struct HashCode {
+    HashCode() { memset(&hash_code, 0, sizeof(HashState)); }
+    // This function is used by unordered_set to compare elements of HashCode.
+    bool operator==(const HashCode& h) const {
+      return (this->hash_code.h1 == h.hash_code.h1)
+          && (this->hash_code.h2 == h.hash_code.h2);
+    }
+    friend std::ostream& operator<<(std::ostream& o, const HashCode& h) {
+      return o << std::hex << std::setfill('0') << std::setw(16) << 
h.hash_code.h1 << ":"
+               << std::setw(16) << h.hash_code.h2;
+    }
+    string str() {
+      std::stringstream out;
+      out << *this;
+      return out.str();
+    }
+    HashState hash_code;
+  };
+
+  /// Used as the hash function in unordered_set for struct HashCode.
+  /// It is required by the unordered_set to generate a hash code for a
+  /// customized structure.
+  struct HashCodeHash {
+    uint64_t operator()(const HashCode& h) const {
+      // Return the first 64bits for the hash.
+      return h.hash_code.h1;
+    }
+  };
+
+  /// The type of the length of a general element in the codegen cache key.
+  typedef int ElementLengthType;
+
+  /// Construct an empty key.
+  CodeGenCacheKey() {}
+
+  /// Construct the key from a string. Uses move to fasten the process.
+  CodeGenCacheKey(string key) : key_(std::move(key)) {}
+
+  /// Set the data of the key. Uses swap to fasten the process.
+  void set_key(string key) { key_.swap(key); }
+
+  /// Return the hash code of the key.
+  HashCode hash_code() const {
+    DCHECK(!key_.empty());
+    DCHECK(key_.length() >= sizeof(HashCode));
+    return *(const HashCode*)key_.c_str();
+  }
+
+  /// Return the slice of the hash code of the key.
+  Slice hash_code_slice() const {
+    DCHECK_LE(sizeof(HashCode), key_.size());
+    return Slice(key_.c_str(), sizeof(HashCode));
+  }
+
+  /// Return the size of a key in optimal mode.
+  static constexpr size_t OptimalKeySize = sizeof(HashCode) + 
sizeof(ElementLengthType);
+
+  /// Return the slice of key in optimal mode, which is the hash code plus the 
total
+  /// length of a full key.
+  Slice optimal_key_slice() const {
+    DCHECK_LE(OptimalKeySize, key_.size());
+    return Slice(key_.c_str(), OptimalKeySize);
+  }
+
+  /// Return the hash code from a key of type Slice.
+  static HashCode hash_code(Slice key) {
+    DCHECK(!key.empty());
+    DCHECK_LE(sizeof(HashCode), key.size());
+    return *(const HashCode*)key.data();
+    ;
+  }
+
+  /// Return the data of the key.
+  string& data() { return key_; }
+  Slice data_slice() const { return Slice(key_.c_str(), key_.size()); }
+
+  /// Return if the key is empty.
+  bool empty() const { return key_.empty(); }
+
+ private:
+  /// The data of the key.
+  std::string key_;
+};
+
+/// The class helps to construct a CodeGenCacheKey.
+class CodeGenCacheKeyConstructor {
+ public:
+  /// Construct a CodeGenCacheKey from a string.
+  /// It could be in future to construct from a list of string keys.
+  static void construct(std::string&, CodeGenCacheKey*);
+  /// An arbitrary constant used to seed the hash.
+  static constexpr uint64_t CODEGEN_CACHE_HASH_SEED_CONST = 0x6b8b4567327b23c6;
+};
+
+/// The class helps to analyze the codegen cache mode.
+class CodeGenCacheModeAnalyzer {
+ public:
+  // The mask should follow the definition of TCodeGenCacheMode.
+  // The lowest byte of the TCodeGenCacheMode is used to define the type NORMAL
+  // or OPTIMAL.
+  // The nineth bit (bit 8) is used to define whether it is a debug mode. If 
it is 1,
+  // then it is a debug mode.
+  static const uint64_t CODEGEN_CACHE_MODE_MASK = 0xFF;
+  static const uint64_t CODEGEN_CACHE_MODE_BITS = 8;
+  static bool is_optimal(const TCodeGenCacheMode::type& mode) {
+    return (mode & CODEGEN_CACHE_MODE_MASK) == TCodeGenCacheMode::OPTIMAL;
+  }
+  static bool is_debug(const TCodeGenCacheMode::type& mode) {
+    return (mode >> CODEGEN_CACHE_MODE_BITS) != 0;
+  }
+};
+
+struct CodeGenCacheEntry {
+  CodeGenCacheEntry() { Init(); }
+  bool Empty() { return engine_pointer == nullptr; }
+  void Init() { memset((uint8_t*)this, 0, sizeof(CodeGenCacheEntry)); }
+  void Reset() { Reset(nullptr, 0, 0, 0, 0); }
+  void Reset(llvm::ExecutionEngine* engine_ptr, int64_t num_funcs, int64_t 
num_instrucs,
+      uint64_t names_hashcode, int64_t charge) {
+    engine_pointer = engine_ptr;
+    num_functions = num_funcs;
+    num_instructions = num_instrucs;
+    function_names_hashcode = names_hashcode;
+    total_bytes_charge = charge;
+  }
+  llvm::ExecutionEngine* engine_pointer;
+  int64_t num_functions;
+  int64_t num_instructions;
+  /// The hashcode of function names in the entry.
+  uint64_t function_names_hashcode;
+  /// Bytes charge including the key and the entry.
+  int64_t total_bytes_charge;
+};
+
+/// Each CodeGenCache is supposed to be a singleton in the daemon, manages the 
codegen
+/// cache entries, including providing the function of entry storage and look 
up, and
+/// entry eviction if capacity limit is met.
+class CodeGenCache {
+ public:
+  CodeGenCache(MetricGroup*);
+  ~CodeGenCache() {
+    is_closed_ = true;
+    ReleaseResources();
+  }
+
+  /// Initilization for the codegen cache, including cache and metrics 
allocation.
+  Status Init(int64_t capacity);
+
+  /// Release the resources that occupied by the codegen cache before 
destruction.
+  void ReleaseResources();
+
+  /// Lookup the specific cache key for the cache entry. If the key doesn't 
exist, the
+  /// entry will be reset to empty.
+  /// Return Status::Okay unless there is any internal error to throw.
+  Status Lookup(const CodeGenCacheKey& key, const TCodeGenCacheMode::type& 
mode,
+      CodeGenCacheEntry* entry, std::shared_ptr<llvm::ExecutionEngine>* 
execution_engine);
+
+  /// Store the cache entry with the specific cache key.
+  Status Store(const CodeGenCacheKey& key, const LlvmCodeGen* codegen,
+      const TCodeGenCacheMode::type& mode);
+
+  /// Store the shared pointer of llvm execution engine to the cache to keep 
all the
+  /// jitted functions in that engine alive.
+  void StoreEngine(const LlvmCodeGen* codegen);
+
+  /// Look up the shared pointer of llvm execution engine by its raw pointer 
address.
+  /// If found, return true. Ohterwise, return false.
+  bool LookupEngine(const llvm::ExecutionEngine* engine,
+      std::shared_ptr<llvm::ExecutionEngine>* execution_engine);
+
+  /// Remove the shared pointer of llvm execution engine from the cache by its 
raw
+  /// pointer address.
+  void RemoveEngine(const llvm::ExecutionEngine* engine);
+
+  /// Increment a hit count or miss count.
+  void IncHitOrMissCount(bool hit) {
+    DCHECK(codegen_cache_hits_ && codegen_cache_misses_);
+    if (hit) {
+      codegen_cache_hits_->Increment(1);
+    } else {
+      codegen_cache_misses_->Increment(1);
+    }
+  }
+
+  /// EvictionCallback for the codegen cache.
+  class EvictionCallback : public Cache::EvictionCallback {
+   public:
+    EvictionCallback(CodeGenCache* cache, IntCounter* entries_evicted,
+        IntGauge* entries_in_use, IntGauge* entries_in_use_bytes)
+      : cache_(cache),
+        codegen_cache_entries_evicted_(entries_evicted),
+        codegen_cache_entries_in_use_(entries_in_use),
+        codegen_cache_entries_in_use_bytes_(entries_in_use_bytes) {}
+    virtual void EvictedEntry(Slice key, Slice value) override;
+
+   private:
+    CodeGenCache* cache_;
+    /// Metrics for the codegen cache in the process level.
+    IntCounter* codegen_cache_entries_evicted_;
+    IntGauge* codegen_cache_entries_in_use_;
+    IntGauge* codegen_cache_entries_in_use_bytes_;
+  };
+
+ private:
+  friend class LlvmCodeGenCacheTest;
+
+  /// Helper function to store the entry to the cache.
+  Status StoreInternal(const CodeGenCacheKey& key, const LlvmCodeGen* codegen,
+      const TCodeGenCacheMode::type& mode);
+
+  /// Indicate if the cache is closed. If is closed, no call is allowed for 
any functions
+  /// other than ReleaseResources().
+  bool is_closed_;
+
+  /// The instance of the cache.
+  std::unique_ptr<Cache> cache_;
+
+  /// Metrics for the codegen cache in the process level.
+  IntCounter* codegen_cache_hits_;
+  IntCounter* codegen_cache_misses_;
+  IntCounter* codegen_cache_entries_evicted_;
+  IntGauge* codegen_cache_entries_in_use_;
+  IntGauge* codegen_cache_entries_in_use_bytes_;
+
+  /// Statistics for the buffer sizes allocated from the system allocator.
+  HistogramMetric* codegen_cache_entry_size_stats_;
+
+  /// Eviction Callback function.
+  std::unique_ptr<CodeGenCache::EvictionCallback> evict_callback_;
+
+  /// Protects to the to insert hash set below.
+  std::mutex to_insert_set_lock_;
+
+  /// The keys of codegen entries that are going to insert to the system cache.
+  /// The purpose of it is to prevent the same key simultaneously to be 
inserted.
+  std::unordered_set<CodeGenCacheKey::HashCode, CodeGenCacheKey::HashCodeHash>
+      keys_to_insert_;
+
+  /// Protects to the map of cached engines.
+  std::mutex cached_engines_lock_;
+
+  /// Stores the llvm execution engine shared pointer to keep the jitted 
functions alive.
+  /// The shared pointer entry could be removed when cache entry evicted or 
the whole
+  /// cache destructed.
+  std::unordered_map<const llvm::ExecutionEngine*, 
std::shared_ptr<llvm::ExecutionEngine>>
+      cached_engines_;
+};
+} // namespace impala
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 94b2e57af..27205580f 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -16,12 +16,12 @@
 // under the License.
 
 #include "codegen/llvm-codegen.h"
+#include "codegen/llvm-codegen-cache.h"
 
 #include <fstream>
+#include <mutex>
 #include <sstream>
 #include <unordered_set>
-
-#include <mutex>
 #include <boost/algorithm/string.hpp>
 #include <boost/assert/source_location.hpp>
 #include <gutil/strings/substitute.h>
@@ -31,6 +31,7 @@
 #include <llvm/Analysis/Passes.h>
 #include <llvm/Analysis/TargetTransformInfo.h>
 #include <llvm/Bitcode/BitcodeReader.h>
+#include <llvm/Bitcode/BitcodeWriter.h>
 #include <llvm/ExecutionEngine/ExecutionEngine.h>
 #include <llvm/ExecutionEngine/MCJIT.h>
 #include <llvm/IR/Constants.h>
@@ -65,18 +66,19 @@
 #include "codegen/mcjit-mem-mgr.h"
 #include "common/logging.h"
 #include "exprs/anyval-util.h"
+#include "gutil/sysinfo.h"
 #include "impala-ir/impala-ir-names.h"
 #include "runtime/collection-value.h"
 #include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/lib-cache.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/fragment-state.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-value.h"
 #include "runtime/timestamp-value.h"
-#include "gutil/sysinfo.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/hdfs-util.h"
@@ -85,6 +87,7 @@
 #include "util/symbols-util.h"
 #include "util/test-info.h"
 #include "util/thread.h"
+#include "util/thrift-debug-util.h"
 
 #include "common/names.h"
 
@@ -226,6 +229,9 @@ LlvmCodeGen::LlvmCodeGen(FragmentState* state, ObjectPool* 
pool,
   context_->setDiagnosticHandler(&DiagnosticHandler::DiagnosticHandlerFn, 
this);
   load_module_timer_ = ADD_TIMER(profile_, "LoadTime");
   prepare_module_timer_ = ADD_TIMER(profile_, "PrepareTime");
+  codegen_cache_lookup_timer_ = ADD_TIMER(profile_, "CodegenCacheLookupTime");
+  codegen_cache_save_timer_ = ADD_TIMER(profile_, "CodegenCacheSaveTime");
+  module_bitcode_gen_timer_ = ADD_TIMER(profile_, "ModuleBitcodeGenTime");
   module_bitcode_size_ = ADD_COUNTER(profile_, "ModuleBitcodeSize", 
TUnit::BYTES);
   ir_generation_timer_ = ADD_TIMER(profile_, "IrGenerationTime");
   optimization_timer_ = ADD_TIMER(profile_, "OptimizationTime");
@@ -235,6 +241,7 @@ LlvmCodeGen::LlvmCodeGen(FragmentState* state, ObjectPool* 
pool,
       ASYNC_CODEGEN_THREAD_COUNTERS_PREFIX);
   num_functions_ = ADD_COUNTER(profile_, "NumFunctions", TUnit::UNIT);
   num_instructions_ = ADD_COUNTER(profile_, "NumInstructions", TUnit::UNIT);
+  num_cached_functions_ = ADD_COUNTER(profile_, "NumCachedFunctions", 
TUnit::UNIT);
   llvm_thread_counters_ = ADD_THREAD_COUNTERS(profile_, "Codegen");
 }
 
@@ -513,6 +520,7 @@ void LlvmCodeGen::Close() {
 
   // Execution engine executes callback on event listener, so tear down engine 
first.
   execution_engine_.reset();
+  execution_engine_cached_.reset();
   symbol_emitter_.reset();
   module_ = nullptr;
 }
@@ -914,6 +922,18 @@ Status LlvmCodeGen::LoadFunction(const TFunction& fn, 
const string& symbol,
     // Associate the dynamically loaded function pointer with the Function* we 
defined.
     // This tells LLVM where the compiled function definition is located in 
memory.
     execution_engine_->addGlobalMapping(*llvm_fn, fn_ptr);
+    // Disable the codegen cache because codegen cache uses the llvm module 
bitcode as
+    // the key while the bitcode doesn't contain the global function mapping 
of the
+    // execution engine. If the mapping is changed during running, like udf 
recreation,
+    // the function mapping in the cache could point to an old address and 
lead to a
+    // crash while calling the udf,  so block the codegen cache for native 
udfs.
+    // Builtin functions should not have the issue, because they should not 
change
+    // during runtime.
+    if (fn.binary_type == TFunctionBinaryType::NATIVE) {
+      // Should be before compilation.
+      DCHECK(!is_compiled_);
+      codegen_cache_enabled_ = false;
+    }
   } else if (fn.binary_type == TFunctionBinaryType::BUILTIN) {
     // In this path, we're running a builtin with the UDF interface. The IR is
     // in the llvm module. Builtin functions may use Expr::GetConstant(). 
Clone the
@@ -1112,6 +1132,123 @@ Status LlvmCodeGen::FinalizeLazyMaterialization() {
   return MaterializeModule();
 }
 
+bool LlvmCodeGen::LookupCache(CodeGenCacheKey& cache_key) {
+  DCHECK(!cache_key.empty());
+  CodeGenCacheEntry entry;
+  CodeGenCache* cache = ExecEnv::GetInstance()->codegen_cache();
+  DCHECK(cache != nullptr);
+  Status lookup_status = cache->Lookup(cache_key,
+      state_->query_options().codegen_cache_mode, &entry, 
&execution_engine_cached_);
+  bool entry_exist = lookup_status.ok() && !entry.Empty();
+  LOG(INFO) << DebugCacheEntryString(cache_key, true /*is_lookup*/,
+      
CodeGenCacheModeAnalyzer::is_debug(state_->query_options().codegen_cache_mode),
+      entry_exist);
+  if (entry_exist) {
+    // Fallback to normal procedure if function names hashcode is not expected.
+    // The names hashcode should be the same unless there is a collision on the
+    // key, we expect this case is very rare.
+    if (function_names_hashcode_ != entry.function_names_hashcode) {
+      LOG(WARNING)
+          << "The codegen cache entry contains a different function names 
hashcode: "
+          << " function names hashcode expected: " << function_names_hashcode_
+          << " actual: " << entry.function_names_hashcode
+          << " key hash_code=" << cache_key.hash_code();
+      cache->IncHitOrMissCount(/*hit*/ false);
+      return false;
+    }
+
+    // execution_engine_cached_ is used to keep the life of the jitted 
functions in
+    // case the engine is evicted in the global cache.
+    DCHECK(execution_engine_cached_ != nullptr);
+    vector<void*> jitted_funcs;
+    // Get pointers to all codegen'd functions.
+    for (int i = 0; i < fns_to_jit_compile_.size(); ++i) {
+      llvm::Function* function = fns_to_jit_compile_[i].first;
+      DCHECK(function != nullptr);
+      // Using the function getFunctionAddress() with a non-existent function 
name
+      // would hit an assertion during the test, could be a bug in llvm 5, 
need to
+      // review after upgrade llvm. But because we already checked the names 
hashcode
+      // for key collision cases, we expect all the functions should be in the
+      // cached execution engine.
+      void* jitted_function = reinterpret_cast<void*>(
+          execution_engine_cached_->getFunctionAddress(function->getName()));
+      if (jitted_function == nullptr) {
+        LOG(WARNING) << "Failed to get a jitted function from cache: "
+                     << function->getName().data()
+                     << " key hash_code=" << cache_key.hash_code();
+        cache->IncHitOrMissCount(/*hit*/ false);
+        return false;
+      }
+      jitted_funcs.emplace_back(jitted_function);
+    }
+    DCHECK_EQ(jitted_funcs.size(), fns_to_jit_compile_.size());
+    for (int i = 0; i < jitted_funcs.size(); i++) {
+      fns_to_jit_compile_[i].second->store(jitted_funcs[i]);
+    }
+
+    // Because we cache the entire execution engine, the cached number of 
functions should
+    // be the same as the total function number.
+    COUNTER_SET(num_cached_functions_, entry.num_functions);
+    COUNTER_SET(num_functions_, entry.num_functions);
+    COUNTER_SET(num_instructions_, entry.num_instructions);
+  }
+  cache->IncHitOrMissCount(/*hit*/ entry_exist);
+  return entry_exist;
+}
+
+string LlvmCodeGen::GetAllFunctionNames() {
+  stringstream result;
+  // The way to concat would be like "function1,function2".
+  const char separator = ',';
+  for (int i = 0; i < fns_to_jit_compile_.size(); ++i) {
+    llvm::Function* function = fns_to_jit_compile_[i].first;
+    DCHECK(function != nullptr);
+    result << function->getName().data() << separator;
+  }
+  return result.str();
+}
+
+void LlvmCodeGen::GenerateFunctionNamesHashCode() {
+  string function_names = GetAllFunctionNames();
+  // Use the same hash seed as the codegen cache key.
+  function_names_hashcode_ = HashUtil::MurmurHash2_64(function_names.c_str(),
+      function_names.length(), 
CodeGenCacheKeyConstructor::CODEGEN_CACHE_HASH_SEED_CONST);
+}
+
+Status LlvmCodeGen::StoreCache(CodeGenCacheKey& cache_key) {
+  DCHECK(!cache_key.empty());
+  Status store_status = ExecEnv::GetInstance()->codegen_cache()->Store(
+      cache_key, this, state_->query_options().codegen_cache_mode);
+  LOG(INFO) << DebugCacheEntryString(cache_key, false /*is_lookup*/,
+      
CodeGenCacheModeAnalyzer::is_debug(state_->query_options().codegen_cache_mode));
+  return store_status;
+}
+
+void LlvmCodeGen::PruneModule() {
+  SCOPED_TIMER(optimization_timer_);
+  llvm::TargetIRAnalysis target_analysis =
+      execution_engine_->getTargetMachine()->getTargetIRAnalysis();
+
+  // Before running any other optimization passes, run the internalize pass, 
giving it
+  // the names of all functions registered by AddFunctionToJit(), followed by 
the
+  // global dead code elimination pass. This causes all functions not 
registered to be
+  // JIT'd to be marked as internal, and any internal functions that are not 
used are
+  // deleted by DCE pass. This greatly decreases compile time by removing 
unused code.
+  unordered_set<string> exported_fn_names;
+  for (auto& entry : fns_to_jit_compile_) {
+    exported_fn_names.insert(entry.first->getName().str());
+  }
+  unique_ptr<llvm::legacy::PassManager> module_pass_manager(
+      new llvm::legacy::PassManager());
+  
module_pass_manager->add(llvm::createTargetTransformInfoWrapperPass(target_analysis));
+  module_pass_manager->add(
+      llvm::createInternalizePass([&exported_fn_names](const 
llvm::GlobalValue& gv) {
+        return exported_fn_names.find(gv.getName().str()) != 
exported_fn_names.end();
+      }));
+  module_pass_manager->add(llvm::createGlobalDCEPass());
+  module_pass_manager->run(*module_);
+}
+
 Status LlvmCodeGen::FinalizeModule() {
   DCHECK(!is_compiled_);
   is_compiled_ = true;
@@ -1157,6 +1294,27 @@ Status LlvmCodeGen::FinalizeModule() {
   }
 
   RETURN_IF_ERROR(FinalizeLazyMaterialization());
+  PruneModule();
+
+  bool codegen_cache_enabled = state_->codegen_cache_enabled() && 
codegen_cache_enabled_;
+  CodeGenCacheKey cache_key;
+  if (codegen_cache_enabled) {
+    string bitcode;
+    SCOPED_TIMER(codegen_cache_lookup_timer_);
+    {
+      SCOPED_TIMER(module_bitcode_gen_timer_);
+      llvm::raw_string_ostream bitcode_stream(bitcode);
+      llvm::WriteBitcodeToFile(module_, bitcode_stream);
+      bitcode_stream.flush();
+    }
+    CodeGenCacheKeyConstructor::construct(bitcode, &cache_key);
+    // Generate the function names hashcode no matter the look up result, will 
be used
+    // in the cache store process if look up failed.
+    GenerateFunctionNamesHashCode();
+    DCHECK(!cache_key.empty());
+    if (LookupCache(cache_key)) return Status::OK();
+  }
+
   if (optimizations_enabled_ && !FLAGS_disable_optimization_passes) {
     RETURN_IF_ERROR(OptimizeModule());
   }
@@ -1179,16 +1337,26 @@ Status LlvmCodeGen::FinalizeModule() {
   }
 
   SetFunctionPointers();
-  DestroyModule();
-
-  // Track the memory consumed by the compiled code.
-  int64_t bytes_allocated = memory_manager_->bytes_allocated();
-  if (!mem_tracker_->TryConsume(bytes_allocated)) {
-    const string& msg = Substitute(
-        "Failed to allocate '$0' bytes for compiled code module", 
bytes_allocated);
-    return mem_tracker_->MemLimitExceeded(NULL, msg, bytes_allocated);
+  Status store_cache_status;
+  if (codegen_cache_enabled) {
+    SCOPED_TIMER(codegen_cache_save_timer_);
+    store_cache_status = StoreCache(cache_key);
+    LOG(INFO) << DebugCacheEntryString(cache_key, false /*is_lookup*/,
+        
CodeGenCacheModeAnalyzer::is_debug(state_->query_options().codegen_cache_mode));
   }
-  memory_manager_->set_bytes_tracked(bytes_allocated);
+  // If the codegen is stored to the cache successfully, the cache will be 
responsible to
+  // track the memory consumption instead.
+  if (!codegen_cache_enabled || !store_cache_status.ok()) {
+    // Track the memory consumed by the compiled code.
+    int64_t bytes_allocated = memory_manager_->bytes_allocated();
+    if (!mem_tracker_->TryConsume(bytes_allocated)) {
+      const string& msg = Substitute(
+          "Failed to allocate '$0' bytes for compiled code module", 
bytes_allocated);
+      return mem_tracker_->MemLimitExceeded(NULL, msg, bytes_allocated);
+    }
+    memory_manager_->set_bytes_tracked(bytes_allocated);
+  }
+  DestroyModule();
   return Status::OK();
 }
 
@@ -1242,25 +1410,8 @@ Status LlvmCodeGen::OptimizeModule() {
   // machine to optimisation passes, e.g. the cost model.
   llvm::TargetIRAnalysis target_analysis =
       execution_engine_->getTargetMachine()->getTargetIRAnalysis();
-
-  // Before running any other optimization passes, run the internalize pass, 
giving it
-  // the names of all functions registered by AddFunctionToJit(), followed by 
the
-  // global dead code elimination pass. This causes all functions not 
registered to be
-  // JIT'd to be marked as internal, and any internal functions that are not 
used are
-  // deleted by DCE pass. This greatly decreases compile time by removing 
unused code.
-  unordered_set<string> exported_fn_names;
-  for (auto& entry : fns_to_jit_compile_) {
-    exported_fn_names.insert(entry.first->getName().str());
-  }
   unique_ptr<llvm::legacy::PassManager> module_pass_manager(
       new llvm::legacy::PassManager());
-  
module_pass_manager->add(llvm::createTargetTransformInfoWrapperPass(target_analysis));
-  module_pass_manager->add(
-      llvm::createInternalizePass([&exported_fn_names](const 
llvm::GlobalValue& gv) {
-        return exported_fn_names.find(gv.getName().str()) != 
exported_fn_names.end();
-      }));
-  module_pass_manager->add(llvm::createGlobalDCEPass());
-  module_pass_manager->run(*module_);
 
   // Update counters before final optimization, but after removing unused 
functions. This
   // gives us a rough measure of how much work the optimization and 
compilation must do.
@@ -1834,6 +1985,36 @@ string LlvmCodeGen::DiagnosticHandler::GetErrorString() {
   }
   return "";
 }
+
+string LlvmCodeGen::DebugCacheEntryString(
+    CodeGenCacheKey& key, bool is_lookup, bool debug_mode, bool success) const 
{
+  stringstream out;
+  if (is_lookup) {
+    out << "Look up codegen cache ";
+  } else {
+    out << "Store to codegen cache ";
+  }
+  if (success) {
+    out << "succeeded. ";
+  } else {
+    if (is_lookup) {
+      out << "missed. ";
+    } else {
+      out << "failed. ";
+    }
+  }
+  out << "CodeGen Cache Key hash_code=" << key.hash_code();
+  out << " query_id=" << PrintId(state_->query_id()) << "\n";
+  if (UNLIKELY(debug_mode)) {
+    out << "Fragment Plan: " << 
apache::thrift::ThriftDebugString(state_->fragment())
+        << "\n";
+    out << "CodeGen Functions: \n";
+    for (int i = 0; i < fns_to_jit_compile_.size(); ++i) {
+      out << "  " << fns_to_jit_compile_[i].first->getName().data() << "\n";
+    }
+  }
+  return out.str();
+}
 }
 
 namespace boost {
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 3e10e0e37..edc2665f4 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -91,6 +91,7 @@ class ImpalaMCJITMemoryManager;
 class SubExprElimination;
 class Thread;
 class TupleDescriptor;
+class CodeGenCacheKey;
 
 /// Define builder subclass in case we want to change the template arguments 
later
 class LlvmBuilder : public llvm::IRBuilder<> {
@@ -189,6 +190,9 @@ class LlvmCodeGen {
   /// Turns on/off optimization passes
   void EnableOptimizations(bool enable);
 
+  std::string DebugCacheEntryString(
+      CodeGenCacheKey& key, bool is_lookup, bool debug_mode, bool success = 
true) const;
+
   /// For debugging. Returns the IR that was generated.  If full_module, the
   /// entire module is dumped, including what was loaded from precompiled IR.
   /// If false, only output IR for functions which were handcrafted.
@@ -428,6 +432,9 @@ class LlvmCodeGen {
   /// be deleted in FinalizeModule(), otherwise, it returns the function 
object.
   llvm::Function* FinalizeFunction(llvm::Function* function);
 
+  /// Prunes any unused functions from the module.
+  void PruneModule();
+
   /// Adds the function to be automatically jit compiled when the codegen 
object is
   /// finalized. FinalizeModule() will set *fn_ptr to point to the jitted 
function.
   ///
@@ -629,6 +636,8 @@ class LlvmCodeGen {
   friend class LlvmCodeGenTest_CpuAttrWhitelist_Test;
   friend class LlvmCodeGenTest_HashTest_Test;
   friend class SubExprElimination;
+  friend class CodeGenCache;
+  friend class LlvmCodeGenCacheTest;
 
   /// Top level codegen object. 'module_id' is used for debugging when 
outputting the IR.
   LlvmCodeGen(FragmentState* state, ObjectPool* pool, MemTracker* 
parent_mem_tracker,
@@ -703,6 +712,12 @@ class LlvmCodeGen {
   // Used for testing.
   void ResetVerification() { is_corrupt_ = false; }
 
+  // Lookup the codegen functions from the cache to reduce optimization time.
+  bool LookupCache(CodeGenCacheKey& key);
+
+  // Store the codegen functions to the cache if codegen cache is enabled.
+  Status StoreCache(CodeGenCacheKey& key);
+
   /// Optimizes the module. This includes pruning the module of any unused 
functions.
   Status OptimizeModule();
 
@@ -747,6 +762,16 @@ class LlvmCodeGen {
   /// generated is retained by the execution engine.
   void DestroyModule();
 
+  /// Generate a string containing all jitted function names from 
fns_to_jit_compile_.
+  /// The generation of the string is simply the concatenation of all the 
names by
+  /// sequence in fns_to_jit_compile_. Would be used in the codegen cache 
lookup to
+  /// confirm whether a cache entry matches the need of the LlvmCodeGen.
+  std::string GetAllFunctionNames();
+
+  /// Generate and store the hash code of all the function names. Will be used 
to
+  /// codegen cache only.
+  void GenerateFunctionNamesHashCode();
+
   /// Disable CPU attributes in 'cpu_attrs' that are not present in
   /// the '--llvm_cpu_attr_whitelist' flag. The same attributes in the input 
are
   /// always present in the output, except "+" is flipped to "-" for the 
disabled
@@ -800,6 +825,13 @@ class LlvmCodeGen {
   /// Time spent creating the initial module with the cross-compiled Impala IR.
   RuntimeProfile::Counter* prepare_module_timer_;
 
+  /// Time spent generating module bitcode.
+  RuntimeProfile::Counter* module_bitcode_gen_timer_;
+
+  /// Time spent for codegen cache look up and save.
+  RuntimeProfile::Counter* codegen_cache_lookup_timer_;
+  RuntimeProfile::Counter* codegen_cache_save_timer_;
+
   /// Time spent by ExecNodes while adding IR to the module. Update by
   /// FragmentInstanceState during its 'CODEGEN_START' state.
   RuntimeProfile::Counter* ir_generation_timer_;
@@ -824,6 +856,9 @@ class LlvmCodeGen {
   RuntimeProfile::Counter* num_functions_;
   RuntimeProfile::Counter* num_instructions_;
 
+  /// Number of functions that are used and cached.
+  RuntimeProfile::Counter* num_cached_functions_;
+
   /// Aggregated llvm thread counters. Also includes the phase represented by
   /// 'ir_generation_timer_' and hence is also updated by 
FragmentInstanceState.
   RuntimeProfile::ThreadCounters* llvm_thread_counters_;
@@ -833,6 +868,9 @@ class LlvmCodeGen {
   /// whether or not optimizations are enabled
   bool optimizations_enabled_;
 
+  /// Whether or not codegen cache is enabled.
+  bool codegen_cache_enabled_ = true;
+
   /// If true, the module is corrupt and we cannot codegen this query.
   /// TODO: we could consider just removing the offending function and 
attempting to
   /// codegen the rest of the query.  This requires more testing though to 
make sure
@@ -855,7 +893,10 @@ class LlvmCodeGen {
   llvm::Module* module_;
 
   /// Execution/Jitting engine.
-  std::unique_ptr<llvm::ExecutionEngine> execution_engine_;
+  std::shared_ptr<llvm::ExecutionEngine> execution_engine_;
+
+  /// Cached Execution/Jitting engine.
+  std::shared_ptr<llvm::ExecutionEngine> execution_engine_cached_;
 
   /// The memory manager used by 'execution_engine_'. Owned by 
'execution_engine_'.
   ImpalaMCJITMemoryManager* memory_manager_;
@@ -892,6 +933,11 @@ class LlvmCodeGen {
   /// The vector of functions to automatically JIT compile after 
FinalizeModule().
   std::vector<std::pair<llvm::Function*, CodegenFnPtrBase*>> 
fns_to_jit_compile_;
 
+  /// The hash code generated from all the function names in 
fns_to_jit_compile_.
+  /// Used by the codegen cache only.
+  uint64_t function_names_hashcode_;
+
+  /// The symbol emitted associated with 'execution_engine_'. Methods on
   /// llvm representation of a few common types.  Owned by context.
   llvm::PointerType* ptr_type_;             // int8_t*
   llvm::Type* void_type_;                   // void
diff --git a/be/src/exprs/scalar-expr.cc b/be/src/exprs/scalar-expr.cc
index cf5a2872d..86a1689e0 100644
--- a/be/src/exprs/scalar-expr.cc
+++ b/be/src/exprs/scalar-expr.cc
@@ -142,13 +142,13 @@ Status ScalarExpr::CreateNode(
     case TExprNodeType::TIMESTAMP_LITERAL:
     case TExprNodeType::DATE_LITERAL:
       *expr = pool->Add(new Literal(texpr_node));
-      return Status::OK();
+      break;
     case TExprNodeType::CASE_EXPR:
       if (!texpr_node.__isset.case_expr) {
         return Status("Case expression not set in thrift node");
       }
       *expr = pool->Add(new CaseExpr(texpr_node));
-      return Status::OK();
+      break;
     case TExprNodeType::COMPOUND_PRED:
       if (texpr_node.fn.name.function_name == "and") {
         *expr = pool->Add(new AndPredicate(texpr_node));
@@ -158,19 +158,19 @@ Status ScalarExpr::CreateNode(
         DCHECK_EQ(texpr_node.fn.name.function_name, "not");
         *expr = pool->Add(new ScalarFnCall(texpr_node));
       }
-      return Status::OK();
+      break;
     case TExprNodeType::NULL_LITERAL:
       *expr = pool->Add(new NullLiteral(texpr_node));
-      return Status::OK();
+      break;
     case TExprNodeType::SLOT_REF:
       if (!texpr_node.__isset.slot_ref) {
         return Status("Slot reference not set in thrift node");
       }
       *expr = pool->Add(new SlotRef(texpr_node));
-      return Status::OK();
+      break;
     case TExprNodeType::TUPLE_IS_NULL_PRED:
       *expr = pool->Add(new TupleIsNullPredicate(texpr_node));
-      return Status::OK();
+      break;
     case TExprNodeType::FUNCTION_CALL:
       if (!texpr_node.__isset.fn) {
         return Status("Function not set in thrift node");
@@ -193,22 +193,24 @@ Status ScalarExpr::CreateNode(
       } else {
         *expr = pool->Add(new ScalarFnCall(texpr_node));
       }
-      return Status::OK();
+      break;
     case TExprNodeType::IS_NOT_EMPTY_PRED:
       *expr = pool->Add(new IsNotEmptyPredicate(texpr_node));
-      return Status::OK();
+      break;
     case TExprNodeType::KUDU_PARTITION_EXPR:
       *expr = pool->Add(new KuduPartitionExpr(texpr_node));
-      return Status::OK();
+      break;
     case TExprNodeType::VALID_TUPLE_ID_EXPR:
       *expr = pool->Add(new ValidTupleIdExpr(texpr_node));
-      return Status::OK();
+      break;
     default:
       *expr = nullptr;
       stringstream os;
       os << "Unknown expr node type: " << texpr_node.node_type;
       return Status(os.str());
   }
+  DCHECK(*expr != nullptr);
+  return Status::OK();
 }
 
 Status ScalarExpr::OpenEvaluator(FunctionContext::FunctionStateScope scope,
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 34820d450..5b0a95742 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -24,6 +24,7 @@
 #include <gutil/strings/substitute.h>
 
 #include "catalog/catalog-service-client-wrapper.h"
+#include "codegen/llvm-codegen-cache.h"
 #include "common/logging.h"
 #include "common/object-pool.h"
 #include "exec/kudu/kudu-util.h"
@@ -99,6 +100,8 @@ DEFINE_int32(admission_control_slots, 0,
     "this backend. The degree of parallelism of the query determines the 
number of slots "
     "that it needs. Defaults to number of cores / -num_cores for executors, 
and 8x that "
     "value for dedicated coordinators).");
+DEFINE_string(codegen_cache_capacity, "1GB",
+    "Specify the capacity of the codegen cache. If set to 0, codegen cache is 
disabled.");
 
 DEFINE_bool(use_local_catalog, false,
     "Use the on-demand metadata feature in coordinators. If this is set, 
coordinators "
@@ -165,6 +168,10 @@ DEFINE_string(metrics_webserver_interface, "",
 
 const static string DEFAULT_FS = "fs.defaultFS";
 
+// The max percentage that the codegen cache can take from the total process 
memory.
+// The value is set to 20%.
+const double MAX_CODEGEN_CACHE_MEM_PERCENT = 0.2;
+
 // The multiplier for how many queries a dedicated coordinator can run 
compared to an
 // executor. This is only effective when using non-default settings for 
executor groups
 // and the absolute value can be overridden by the '--admission_control_slots' 
flag.
@@ -443,6 +450,25 @@ Status ExecEnv::Init() {
 
   RETURN_IF_ERROR(admission_controller_->Init());
   RETURN_IF_ERROR(InitHadoopConfig());
+  int64_t codegen_cache_capacity =
+      ParseUtil::ParseMemSpec(FLAGS_codegen_cache_capacity, &is_percent, 0);
+  if (codegen_cache_capacity > 0) {
+    int64_t codegen_cache_limit = mem_tracker_->limit() * 
MAX_CODEGEN_CACHE_MEM_PERCENT;
+    DCHECK(codegen_cache_limit > 0);
+    if (codegen_cache_capacity > codegen_cache_limit) {
+      LOG(INFO) << "CodeGen Cache capacity changed to "
+                << PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES) 
<< " from "
+                << PrettyPrinter::Print(codegen_cache_limit, TUnit::BYTES)
+                << " due to reaching the limit.";
+      codegen_cache_capacity = codegen_cache_limit;
+    }
+    codegen_cache_.reset(new CodeGenCache(metrics_.get()));
+    RETURN_IF_ERROR(codegen_cache_->Init(codegen_cache_capacity));
+    LOG(INFO) << "CodeGen Cache initialized with capacity "
+              << PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES);
+  } else {
+    LOG(INFO) << "CodeGen Cache is disabled.";
+  }
   return Status::OK();
 }
 
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 31d90169f..bf710a12e 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -69,6 +69,7 @@ class SystemStateInfo;
 class ThreadResourceMgr;
 class TmpFileMgr;
 class Webserver;
+class CodeGenCache;
 
 namespace io {
   class DiskIoMgr;
@@ -151,6 +152,8 @@ class ExecEnv {
   Scheduler* scheduler() { return scheduler_.get(); }
   AdmissionController* admission_controller() { return 
admission_controller_.get(); }
   StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
+  CodeGenCache* codegen_cache() const { return codegen_cache_.get(); }
+  bool codegen_cache_enabled() const { return codegen_cache_ != nullptr; }
 
   const TNetworkAddress& configured_backend_address() const {
     return configured_backend_address_;
@@ -232,6 +235,9 @@ class ExecEnv {
   /// Tracks system resource usage which we then include in profiles.
   boost::scoped_ptr<SystemStateInfo> system_state_info_;
 
+  /// Singleton cache for codegen functions.
+  boost::scoped_ptr<CodeGenCache> codegen_cache_;
+
   /// Not owned by this class
   ImpalaServer* impala_server_ = nullptr;
   MetricGroup* rpc_metrics_ = nullptr;
diff --git a/be/src/runtime/fragment-state.h b/be/src/runtime/fragment-state.h
index fa6a66bc2..f78b2d10e 100644
--- a/be/src/runtime/fragment-state.h
+++ b/be/src/runtime/fragment-state.h
@@ -60,6 +60,7 @@ class FragmentState {
   ObjectPool* obj_pool() { return &obj_pool_; }
   int fragment_idx() const { return fragment_.idx; }
   const TQueryOptions& query_options() const { return 
query_state_->query_options(); }
+  const TQueryCtx& query_ctx() const { return query_state_->query_ctx(); }
   const TPlanFragment& fragment() const { return fragment_; }
   const PlanFragmentCtxPB& fragment_ctx() const { return fragment_ctx_; }
   const std::vector<const TPlanFragmentInstanceCtx*>& instance_ctxs() const {
@@ -68,6 +69,8 @@ class FragmentState {
   const std::vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs() 
const {
     return instance_ctx_pbs_;
   }
+  /// Return whether the codegen cache is enabled. It relies on the setting of 
the query.
+  bool codegen_cache_enabled() const { return 
query_state_->codegen_cache_enabled(); }
   /// Return the minimum per-fragment index of an instance on this backend.
   int min_per_fragment_instance_idx() const {
     // 'instance_ctxs_' is in ascending order, so can just return the first 
one.
@@ -79,6 +82,7 @@ class FragmentState {
   const TUniqueId& query_id() const { return query_state_->query_id(); }
   const DescriptorTbl& desc_tbl() const { return query_state_->desc_tbl(); }
   MemTracker* query_mem_tracker() const { return 
query_state_->query_mem_tracker(); }
+  QueryState* query_state() const { return query_state_; }
   RuntimeProfile* runtime_profile() { return runtime_profile_; }
 
   static const std::string FSTATE_THREAD_GROUP_NAME;
diff --git a/be/src/runtime/krpc-data-stream-sender-ir.cc 
b/be/src/runtime/krpc-data-stream-sender-ir.cc
index 335e16f9e..d0da802d9 100644
--- a/be/src/runtime/krpc-data-stream-sender-ir.cc
+++ b/be/src/runtime/krpc-data-stream-sender-ir.cc
@@ -35,7 +35,7 @@ Status KrpcDataStreamSender::HashAndAddRows(RowBatch* batch) {
     int row_count = 0;
     FOREACH_ROW_LIMIT(batch, row_idx, RowBatch::HASH_BATCH_SIZE, 
row_batch_iter) {
       TupleRow* row = row_batch_iter.Get();
-      channel_ids[row_count++] = HashRow(row) % num_channels;
+      channel_ids[row_count++] = HashRow(row, exchange_hash_seed_) % 
num_channels;
     }
     row_count = 0;
     FOREACH_ROW_LIMIT(batch, row_idx, RowBatch::HASH_BATCH_SIZE, 
row_batch_iter) {
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index d17c8e46d..bfab9366a 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -72,7 +72,7 @@ DECLARE_int32(rpc_retry_interval_ms);
 namespace impala {
 
 const char* KrpcDataStreamSender::HASH_ROW_SYMBOL =
-    "KrpcDataStreamSender7HashRowEPNS_8TupleRowE";
+    "KrpcDataStreamSender7HashRowEPNS_8TupleRowEm";
 const char* KrpcDataStreamSender::LLVM_CLASS_NAME = 
"class.impala::KrpcDataStreamSender";
 const char* KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER = "TotalBytesSent";
 
@@ -807,7 +807,8 @@ Status KrpcDataStreamSender::Open(RuntimeState* state) {
 // An example of generated code with int type.
 //
 // define i64 
@KrpcDataStreamSenderHashRow(%"class.impala::KrpcDataStreamSender"* %this,
-//                                         %"class.impala::TupleRow"* %row) 
#46 {
+//                                         %"class.impala::TupleRow"* %row,
+//                                         i64 %seed) #52 {
 // entry:
 //   %0 = alloca i32
 //   %1 = call %"class.impala::ScalarExprEvaluator"*
@@ -833,7 +834,7 @@ Status KrpcDataStreamSender::Open(RuntimeState* state) {
 //   %hash_val = call i64
 //       @_ZN6impala8RawValue20GetHashValueFastHashEPKvRKNS_10ColumnTypeEm(
 //           i8* %val_ptr_phi, %"struct.impala::ColumnType"* @expr_type_arg,
-//               i64 7403188670037225271)
+//               i64 %seed)
 //   ret i64 %hash_val
 // }
 Status KrpcDataStreamSenderConfig::CodegenHashRow(
@@ -847,15 +848,15 @@ Status KrpcDataStreamSenderConfig::CodegenHashRow(
       "this", 
codegen->GetNamedPtrType(KrpcDataStreamSender::LLVM_CLASS_NAME)));
   prototype.AddArgument(
       LlvmCodeGen::NamedVariable("row", 
codegen->GetStructPtrType<TupleRow>()));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("seed", 
codegen->i64_type()));
 
-  llvm::Value* args[2];
+  llvm::Value* args[3];
   llvm::Function* hash_row_fn = prototype.GeneratePrototype(&builder, args);
   llvm::Value* this_arg = args[0];
   llvm::Value* row_arg = args[1];
 
   // Store the initial seed to hash_val
-  llvm::Value* hash_val =
-      codegen->GetI64Constant(exchange_hash_seed_);
+  llvm::Value* hash_val = args[2];
 
   // Unroll the loop and codegen each of the partition expressions
   for (int i = 0; i < partition_exprs_.size(); ++i) {
@@ -922,7 +923,6 @@ Status KrpcDataStreamSenderConfig::CodegenHashRow(
   if (*fn == nullptr) {
     return Status("Codegen'd KrpcDataStreamSenderHashRow() fails verification. 
See log");
   }
-
   return Status::OK();
 }
 
@@ -987,8 +987,8 @@ Status KrpcDataStreamSender::AddRowToChannel(const int 
channel_id, TupleRow* row
   return channels_[channel_id]->AddRow(row);
 }
 
-uint64_t KrpcDataStreamSender::HashRow(TupleRow* row) {
-  uint64_t hash_val = exchange_hash_seed_;
+uint64_t KrpcDataStreamSender::HashRow(TupleRow* row, uint64_t seed) {
+  uint64_t hash_val = seed;
   for (ScalarExprEvaluator* eval : partition_expr_evals_) {
     void* partition_val = eval->GetValue(row);
     // We can't use the crc hash function here because it does not result in
diff --git a/be/src/runtime/krpc-data-stream-sender.h 
b/be/src/runtime/krpc-data-stream-sender.h
index 99a7dca86..21074fa66 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -173,7 +173,7 @@ class KrpcDataStreamSender : public DataSink {
 
   /// Evaluates the input row against partition expressions and hashes the 
expression
   /// values. Returns the final hash value.
-  uint64_t HashRow(TupleRow* row);
+  uint64_t HashRow(TupleRow* row, uint64_t seed);
 
   /// Used when 'partition_type_' is HASH_PARTITIONED. Call HashRow() against 
each row
   /// in the input batch and adds it to the corresponding channel based on the 
hash value.
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 25a60d10a..1aa5cb7ec 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -19,6 +19,7 @@
 
 #include <mutex>
 
+#include "codegen/llvm-codegen-cache.h"
 #include "codegen/llvm-codegen.h"
 #include "common/thread-debug-info.h"
 #include "exec/kudu/kudu-util.h"
@@ -800,6 +801,11 @@ bool QueryState::WaitForFinishOrTimeout(int32_t 
timeout_ms) {
   return !timed_out;
 }
 
+bool QueryState::codegen_cache_enabled() const {
+  return !query_options().disable_codegen_cache && !disable_codegen_cache_
+      && ExecEnv::GetInstance()->codegen_cache_enabled();
+}
+
 bool QueryState::StartFInstances() {
   VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
           << " #instances=" << fragment_info_.fragment_instance_ctxs.size();
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 4c652a6da..6f3613f77 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -51,6 +51,7 @@ class FragmentState;
 class FragmentInstanceState;
 class InitialReservations;
 class LlvmCodeGen;
+class CodeGenCache;
 class MemTracker;
 class PlanNode;
 class PublishFilterParamsPB;
@@ -150,6 +151,7 @@ class QueryState {
   const TQueryOptions& query_options() const {
     return query_ctx_.client_request.query_options;
   }
+  bool codegen_cache_enabled() const;
   MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
   RuntimeProfile* host_profile() const { return host_profile_; }
   UniqueIdPB GetCoordinatorBackendId() const;
@@ -462,6 +464,9 @@ class QueryState {
   /// send a status report so that we can cancel after a configurable timeout.
   int64_t failed_report_time_ms_ = 0;
 
+  /// Indicator of whether to disable the codegen cache for the query.
+  bool disable_codegen_cache_ = false;
+
   /// Create QueryState w/ a refcnt of 0 and a memory limit of 'mem_limit' 
bytes applied
   /// to the query mem tracker. The query is associated with the resource pool 
set in
   /// 'query_ctx.request_pool' or from 'request_pool', if the former is not 
set (needed
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index 5dec32544..b68add29e 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -18,8 +18,9 @@
 #ifndef IMPALA_RUNTIME_TEST_ENV
 #define IMPALA_RUNTIME_TEST_ENV
 
-#include "runtime/io/disk-io-mgr.h"
+#include "codegen/llvm-codegen-cache.h"
 #include "runtime/exec-env.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/runtime-state.h"
 
 namespace impala {
@@ -76,9 +77,15 @@ class TestEnv {
   /// Return total of mem tracker consumption for all queries.
   int64_t TotalQueryMemoryConsumption();
 
+  /// Reset the codegen cache.
+  void ResetCodegenCache(MetricGroup* metrics) {
+    exec_env_->codegen_cache_.reset(new CodeGenCache(metrics));
+  }
+
   ExecEnv* exec_env() { return exec_env_.get(); }
   MetricGroup* metrics() { return exec_env_->metrics(); }
   TmpFileMgr* tmp_file_mgr() { return exec_env_->tmp_file_mgr(); }
+  CodeGenCache* codegen_cache() { return exec_env_->codegen_cache_.get(); }
 
  private:
   /// Recreate global metric groups.
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 8582df3a4..a661096c4 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -849,6 +849,17 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         query_options->__set_async_codegen(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::DISABLE_CODEGEN_CACHE: {
+        query_options->__set_disable_codegen_cache(IsTrue(value));
+        break;
+      }
+      case TImpalaQueryOptions::CODEGEN_CACHE_MODE: {
+        TCodeGenCacheMode::type enum_type;
+        RETURN_IF_ERROR(GetThriftEnum(
+            value, "CodeGen Cache Mode", _TCodeGenCacheMode_VALUES_TO_NAMES, 
&enum_type));
+        query_options->__set_codegen_cache_mode(enum_type);
+        break;
+      }
       case TImpalaQueryOptions::ENABLE_DISTINCT_SEMI_JOIN_OPTIMIZATION: {
         
query_options->__set_enable_distinct_semi_join_optimization(IsTrue(value));
         break;
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 05dcdf2ee..4f5330435 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                       
          \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                       
          \
-      TImpalaQueryOptions::FALLBACK_DB_FOR_FUNCTIONS + 1);                     
          \
+      TImpalaQueryOptions::CODEGEN_CACHE_MODE + 1);                            
          \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)     
          \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)   
          \
@@ -276,8 +276,11 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(lock_max_wait_time_s, LOCK_MAX_WAIT_TIME_S, 
TQueryOptionLevel::REGULAR)   \
   QUERY_OPT_FN(orc_schema_resolution, ORC_SCHEMA_RESOLUTION, 
TQueryOptionLevel::REGULAR) \
   QUERY_OPT_FN(expand_complex_types, EXPAND_COMPLEX_TYPES, 
TQueryOptionLevel::REGULAR)   \
-  QUERY_OPT_FN(fallback_db_for_functions, FALLBACK_DB_FOR_FUNCTIONS,           
          \
-      TQueryOptionLevel::ADVANCED);
+  QUERY_OPT_FN(                                                                
          \
+      fallback_db_for_functions, FALLBACK_DB_FOR_FUNCTIONS, 
TQueryOptionLevel::ADVANCED) \
+  QUERY_OPT_FN(                                                                
          \
+      disable_codegen_cache, DISABLE_CODEGEN_CACHE, 
TQueryOptionLevel::ADVANCED)         \
+  QUERY_OPT_FN(codegen_cache_mode, CODEGEN_CACHE_MODE, 
TQueryOptionLevel::DEVELOPMENT);
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
 static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 0b3cef56b..e759c068e 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -742,6 +742,18 @@ enum TImpalaQueryOptions {
 
   // Specify the database name which stores global udf
   FALLBACK_DB_FOR_FUNCTIONS = 148;
+
+  // Specify whether to use codegen cache.
+  DISABLE_CODEGEN_CACHE = 149;
+
+  // Specify how the entry stores to the codegen cache, would affect the entry 
size.
+  // Possible values are NORMAL, OPTIMAL, NORMAL_DEBUG and OPTIMAL_DEBUG.
+  // The normal mode will use a full key for the cache, while the optimal mode 
uses
+  // a hashcode of 128 bits for the key to save the memory consumption.
+  // The debug mode of each of them allows more logs, would be helpful to 
target
+  // an issue.
+  // Only valid if DISABLE_CODEGEN_CACHE is set to false.
+  CODEGEN_CACHE_MODE = 150;
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 596965ff8..4ab9d4101 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -85,6 +85,15 @@ enum TMinmaxFilterFastCodePathMode {
   VERIFICATION=2
 }
 
+// The options for CodeGen Cache.
+// The debug options allow more logs, the value equal to the mode plus 256.
+enum TCodeGenCacheMode {
+  NORMAL = 0
+  OPTIMAL = 1
+  NORMAL_DEBUG = 256
+  OPTIMAL_DEBUG = 257
+}
+
 // Options for when to write Parquet Bloom filters for supported types.
 enum TParquetBloomFilterWrite {
   // Never write Parquet Bloom filters.
@@ -599,6 +608,10 @@ struct TQueryOptions {
   148: optional bool expand_complex_types = false;
 
   149: optional string fallback_db_for_functions;
+
+  // See comment in ImpalaService.thrift
+  150: optional bool disable_codegen_cache = false;
+  151: optional TCodeGenCacheMode codegen_cache_mode = 
TCodeGenCacheMode.NORMAL;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and 
external
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 6e5bca495..7188d75aa 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -19,6 +19,66 @@
     "kind": "COUNTER",
     "key": "external-data-source.class-cache.hits"
   },
+  {
+    "description": "The total number of cache misses in the CodeGen Cache",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "CodeGen Cache Misses",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala.codegen-cache.misses"
+  },
+  {
+    "description": "The number of in-use CodeGen Cache Entries",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "In-use CodeGen Cache Entries",
+    "units": "UNIT",
+    "kind": "GAUGE",
+    "key": "impala.codegen-cache.entries-in-use"
+  },
+  {
+    "description": "The total bytes of in-use CodeGen Cache Entries",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "In-use CodeGen Cache Entries total bytes",
+    "units": "UNIT",
+    "kind": "GAUGE",
+    "key": "impala.codegen-cache.entries-in-use-bytes"
+  },
+  {
+    "description": "The number of evicted CodeGen Cache Entries",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Evicted CodeGen Cache Entries",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala.codegen-cache.entries-evicted"
+  },
+  {
+    "description": "The total number of cache hits in the CodeGen Cache",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "CodeGen Cache Hits",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala.codegen-cache.hits"
+  },
+  {
+    "description": "Statistics for codegen cache entry sizes allocated from 
the system.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "CodeGen Cache Entry Sizes.",
+    "units": "BYTES",
+    "kind": "HISTOGRAM",
+    "key": "impala.codegen-cache.entry-sizes"
+  },
   {
     "description": "Resource Pool $0 Configured Max Mem Resources",
     "contexts": [
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/codegen-cache-udf.test 
b/testdata/workloads/functional-query/queries/QueryTest/codegen-cache-udf.test
new file mode 100644
index 000000000..7f1bb6007
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/codegen-cache-udf.test
@@ -0,0 +1,24 @@
+====
+---- QUERY
+# Test identity functions
+select identity(true);
+---- TYPES
+boolean
+---- RESULTS
+true
+====
+---- QUERY
+select test_count(int_col) from functional.alltypestiny;
+---- RESULTS
+8
+---- TYPES
+bigint
+====
+---- QUERY
+# Test UDFs over tables
+select sum(identity(bigint_col)) from functional.alltypes
+---- TYPES
+bigint
+---- RESULTS
+328500
+====
diff --git a/tests/common/test_result_verifier.py 
b/tests/common/test_result_verifier.py
index 69f22e509..790b895fc 100644
--- a/tests/common/test_result_verifier.py
+++ b/tests/common/test_result_verifier.py
@@ -755,3 +755,11 @@ def assert_codegen_enabled(profile_string, exec_node_ids):
     for exec_options in get_node_exec_options(profile_string, exec_node_id):
       assert 'Codegen Enabled' in exec_options
       assert not 'Codegen Disabled' in exec_options
+
+
+def assert_codegen_cache_hit(profile_string, expect_hit):
+  assert "NumCachedFunctions" in profile_string
+  if expect_hit:
+    assert "NumCachedFunctions: 0 " not in profile_string
+  else:
+    assert "NumCachedFunctions: 0 " in profile_string
diff --git a/tests/custom_cluster/test_codegen_cache.py 
b/tests/custom_cluster/test_codegen_cache.py
new file mode 100644
index 000000000..f867fb378
--- /dev/null
+++ b/tests/custom_cluster/test_codegen_cache.py
@@ -0,0 +1,254 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from copy import copy
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster
+from tests.common.test_result_verifier import assert_codegen_cache_hit
+from tests.util.filesystem_utils import get_fs_path
+
+
[email protected]_hdfs
[email protected]
+class TestCodegenCache(CustomClusterTestSuite):
+  """ This test enables the codegen cache and verfies that cache hit and miss 
counts
+  in the runtime profile and metrics are as expected.
+  """
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestCodegenCache, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache(self, vector):
+    self._test_codegen_cache(vector,
+            ("select * from (select * from functional.alltypes "
+             + "limit 1000000) t1 where int_col > 10 limit 10"))
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_int_col(self, vector):
+    self._test_codegen_cache(vector,
+      "select * from functional.alltypes where int_col > 0")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_tinyint_col(self, vector):
+    self._test_codegen_cache(vector,
+      "select * from functional.alltypes where tinyint_col > 0")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_bool_col(self, vector):
+    self._test_codegen_cache(vector,
+      "select * from functional.alltypes where bool_col > 0")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_bigint_col(self, vector):
+    self._test_codegen_cache(vector,
+      "select * from functional.alltypes where bigint_col > 0")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_float_col(self, vector):
+    self._test_codegen_cache(vector,
+      "select * from functional.alltypes where float_col > 0")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_double_col(self, vector):
+    self._test_codegen_cache(vector,
+      "select * from functional.alltypes where double_col > 0")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_date_string_col(self, vector):
+    self._test_codegen_cache(vector,
+      "select * from functional.alltypes where date_string_col != ''")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_string_col(self, vector):
+    self._test_codegen_cache(vector,
+      "select * from functional.alltypes where string_col != ''")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_poly_func_string_col(self, vector):
+    self._test_codegen_cache(vector,
+      ("select * from functional.alltypes where "
+      + "CHAR_LENGTH(string_col) > 0"))
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_poly_func_date_string_col(self, vector):
+    self._test_codegen_cache(vector,
+      ("select * from functional.alltypes where "
+      + "CHAR_LENGTH(date_string_col) > 0"))
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  # Test native uda is missed in the codegen cache, as it is disabled.
+  def test_codegen_cache_uda_miss(self, vector):
+    database = "test_codegen_cache_uda_miss"
+    self._load_functions(database)
+    self._test_codegen_cache(vector,
+      "select test_count(int_col) from functional.alltypestiny", False)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  # Test native udf is missed in the codegen cache, as it is disabled.
+  def test_codegen_cache_udf_miss(self, vector):
+    database = "test_codegen_cache_udf_miss"
+    self._load_functions(database)
+    self._test_codegen_cache(vector,
+      "select sum(identity(bigint_col)) from functional.alltypes", False)
+
+  def _check_metric_expect_init(self):
+    # Verifies that the cache metrics are all zero.
+    assert self.get_metric('impala.codegen-cache.entries-evicted') == 0
+    assert self.get_metric('impala.codegen-cache.entries-in-use') == 0
+    assert self.get_metric('impala.codegen-cache.entries-in-use-bytes') == 0
+    assert self.get_metric('impala.codegen-cache.hits') == 0
+    assert self.get_metric('impala.codegen-cache.misses') == 0
+
+  def _test_codegen_cache(self, vector, sql, expect_hit=True, 
expect_num_frag=2):
+    # Do not disable codegen.
+    exec_options = copy(vector.get_value('exec_option'))
+    exec_options['exec_single_node_rows_threshold'] = 0
+    self._check_metric_expect_init()
+    result = self.execute_query(sql, exec_options)
+    assert_codegen_cache_hit(result.runtime_profile, False)
+    # expect_num_cache_miss_fragment is 1 iff expect_hit is False, and expect 
only
+    # one fragment codegen cache missing for the case if expect_hit is False.
+    expect_num_cache_miss_fragment = 1
+    if expect_hit:
+        expect_num_cache_miss_fragment = 0
+    expect_num_cache_hit = expect_num_frag - expect_num_cache_miss_fragment
+
+    # Verifies that the cache misses > 0, because the look up fails in an empty
+    # brandnew cache, then a new entry should be stored successfully, so the 
in-use
+    # entry number and bytes should be larger than 0.
+    assert self.get_metric('impala.codegen-cache.entries-evicted') == 0
+    assert self.get_metric('impala.codegen-cache.entries-in-use') == 
expect_num_cache_hit
+    assert self.get_metric('impala.codegen-cache.entries-in-use-bytes') > 0
+    assert self.get_metric('impala.codegen-cache.hits') == 0
+    assert self.get_metric('impala.codegen-cache.misses') == 
expect_num_cache_hit
+
+    result = self.execute_query(sql, exec_options)
+    # Verify again, the expected cache hit should be reflected.
+    if expect_hit:
+      assert_codegen_cache_hit(result.runtime_profile, True)
+    else:
+      assert_codegen_cache_hit(result.runtime_profile, False)
+    assert self.get_metric('impala.codegen-cache.entries-evicted') == 0
+    assert self.get_metric('impala.codegen-cache.entries-in-use') == 
expect_num_cache_hit
+    assert self.get_metric('impala.codegen-cache.entries-in-use-bytes') > 0
+    assert self.get_metric('impala.codegen-cache.hits') == expect_num_cache_hit
+    assert self.get_metric('impala.codegen-cache.misses') == 
expect_num_cache_hit
+
+  def _load_functions(self, database):
+    create_func_template = """
+    use default;
+    drop database if exists {database} CASCADE;
+    create database {database};
+    create aggregate function {database}.test_count(int) returns bigint
+    location '{location_uda}' update_fn='CountUpdate';
+    create function {database}.identity(boolean) returns boolean
+    location '{location_udf}' symbol='Identity';
+    create function {database}.identity(bigint) returns bigint
+    location '{location_udf}' symbol='Identity';
+    use {database};
+    """
+    location_uda = get_fs_path('/test-warehouse/libudasample.so')
+    location_udf = get_fs_path('/test-warehouse/libTestUdfs.so')
+    queries = create_func_template.format(database=database,
+            location_uda=location_uda, location_udf=location_udf)
+    queries = [q for q in queries.split(';') if q.strip()]
+    for query in queries:
+      if query.strip() == '': continue
+      result = self.execute_query_expect_success(self.client, query)
+      assert result is not None
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=3)
+  def test_codegen_cache_udf_crash(self, vector):
+      # The testcase would crash if we don't disable the native udf for 
codegen cache.
+      database = "test_codegen_cache_udf_crash"
+      self._load_functions(database)
+      self.run_test_case('QueryTest/codegen-cache-udf', vector, 
use_db=database)
+      # Even the udf is disabled and the queries are using the udf, there 
could be
+      # other fragments stored to the codegen cache, so we check whether the 
codegen
+      # cache is enabled to other cases.
+      assert self.get_metric('impala.codegen-cache.entries-in-use') > 0
+      assert self.get_metric('impala.codegen-cache.entries-in-use-bytes') > 0
+
+      # Run multiple times, recreate the udfs, would crash if the udf is 
reused from
+      # the codegen cache.
+      for i in range(3):
+        # Make the database different
+        database = database + "diff"
+        self._load_functions(database)
+        self.run_test_case('QueryTest/codegen-cache-udf', vector, 
use_db=database)
+
+  def _test_codegen_cache_timezone_crash_helper(self, database):
+    create_db_template = """
+    use default;
+    drop database if exists {database} CASCADE;
+    create database {database};
+    create table {database}.alltimezones as select * from 
functional.alltimezones;
+    use {database};
+    """
+    queries = create_db_template.format(database=database)
+    queries = [q for q in queries.split(';') if q.strip()]
+    query = "select timezone, utctime, localtime,\
+            from_utc_timestamp(utctime,timezone) as\
+            impalaresult from alltimezones where\
+            localtime != from_utc_timestamp(utctime,timezone)"
+    queries.append(query)
+    for query in queries:
+      if query.strip() == '': continue
+      result = self.execute_query_expect_success(self.client, query)
+      assert result is not None
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1)
+  def test_codegen_cache_timezone_crash(self, vector):
+      # The testcase tests whether it would crash using the broken builtin 
function
+      # from_utc_timestamp from the codegen cache.
+      database = "test_codegen_cache_timezone_crash"
+      # Run multiple times, recreate the database each time. Except for the 
first run,
+      # other runs should all hit the cache.
+      # Expect won't crash.
+      for i in range(5):
+        # Make the database different
+        self._test_codegen_cache_timezone_crash_helper(database + str(i))
+        # During the table creation, there will be one fragment involved, for 
the
+        # query we are going to test, will be two fragments, so totally three
+        # fragments involved, should all be cached.
+        assert self.get_metric('impala.codegen-cache.entries-in-use') == 3
+        assert self.get_metric('impala.codegen-cache.entries-in-use-bytes') > 0
+        assert self.get_metric('impala.codegen-cache.hits') == i * 3
+        assert self.get_metric('impala.codegen-cache.misses') == 3

Reply via email to