This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 4bdb99938 IMPALA-11470: Add Cache For Codegen Functions
4bdb99938 is described below
commit 4bdb99938a785966405fc1cd61dee1597c43f19a
Author: Yida Wu <[email protected]>
AuthorDate: Wed Oct 26 16:38:00 2022 -0700
IMPALA-11470: Add Cache For Codegen Functions
The patch adds supports of the cache for CodeGen functions
to improve the performance of sub-second queries.
The main idea is to store the codegen functions to a cache,
and reuse them when it is appropriate to avoid repeated llvm
optimization time which could take over hundreds of milliseconds.
In this patch, we implement a cache to store codegen functions.
The cache is a singleton instance for each daemon, and contains
multiple cache entries. Each cache entry is at the fragment
level, that is storing all the codegen functions of a fragment
in a cache entry, if one exactly same fragment comes again, it
should be able to find all the codegen functions it needs
from the specific cache entry, therefore saving the time.
The module bitcode is used as the key to the cache, which will
be generated before the module optimization and final
compilation. If codegen_cache_mode is NORMAL, which is by default,
we will store the full bitcode string as the key. Otherwise, if
codegen_cache_mode is set to OPTIMAL, we will store a key only
containing the hash code and the total length of a full key to
reduce memory consumption.
Also, KrpcDataStreamSenderConfig::CodegenHashRow() is changed to
pass the hash seed as an argument because it can't hit the cache
for the fragment if using a dynamic hash seed within the codegen
function.
Codegen cache is disabled automatically for a fragment using a
native udf, because it can lead to a crash in this case. The reason
for that is the udf is loaded to the llvm execution engine global
mapping instead of the llvm module, however, the current key to the
cache entry uses the llvm module bitcode which can't reflect the
change of the udf address if the udf is reloaded during runtime,
for example database recreation, then it could lead to a crash due
to using an old udf address from the cache. Disable it until there
is a better solution, filed IMPALA-11771 to follow.
The patch also introduces following new flags for start and query
options for feature configuration and operation purpose.
Start option for configuration:
- codegen_cache_capacity: The capacity of the cache, if set to 0,
codegen cache is disabled.
Query option for operations:
- disable_codegen_cache: Codegen cache will be disabled when it
is set to true.
- codegen_cache_mode: It is defined by a new enum type
TCodeGenCacheMode. There are four types, NORMAL and OPTIMAL,
and two other types, NORMAL_DEBUG and OPTIMAL_DEBUG, which are
the debug mode of the first two types.
If using NORMAL, a full key will be stored to the cache, it will
cost more memory for each entry because the key is the bitcode
of the llvm module, it can be large.
If using OPTIMAL, the cache will only store the hash code and
length of the key, it reduces the memory consumption largely,
however, could be possible to have collision issues.
If using debug modes, the behavior would be the same as the
non-debug modes, but more logs or statistics will be allowed,
that means could be slower.
Only valid when disable_codegen_cache is set to false.
New impalad metrics:
- impala.codegen-cache.misses
- impala.codegen-cache.entries-in-use
- impala.codegen-cache.entries-in-use-bytes
- impala.codegen-cache.entries-evicted
- impala.codegen-cache.hits
- impala.codegen-cache.entry-sizes
New profile Metrics:
- CodegenCacheLookupTime
- CodegenCacheSaveTime
- ModuleBitcodeGenTime
- NumCachedFunctions
TPCH-1 performance evaluation (8 iteration) on AWS m5a.4xlarge,
the result removes the first iteration to show the benefit of the
cache:
Query Cached(s) NoCache(s) Delta(Avg) NoCodegen(s) Delta(Avg)
TPCH-Q1 0.39 1.02 -61.76% 5.59 -93.02%
TPCH-Q2 0.56 1.21 -53.72% 0.47 19.15%
TPCH-Q3 0.37 0.77 -51.95% 0.43 -13.95%
TPCH-Q4 0.36 0.51 -29.41% 0.33 9.09%
TPCH-Q5 0.39 1.1 -64.55% 0.39 0%
TPCH-Q6 0.24 0.27 -11.11% 0.77 -68.83%
TPCH-Q7 0.39 1.2 -67.5% 0.39 0%
TPCH-Q8 0.58 1.46 -60.27% 0.45 28.89%
TPCH-Q9 0.8 1.38 -42.03% 1 -20%
TPCH-Q10 0.6 1.03 -41.75% 0.85 -29.41%
TPCH-Q11 0.3 0.93 -67.74% 0.2 50%
TPCH-Q12 0.28 0.48 -41.67% 0.38 -26.32%
TPCH-Q13 1.11 1.22 -9.02% 1.16 -4.31%
TPCH-Q14 0.55 0.78 -29.49% 0.45 22.22%
TPCH-Q15 0.33 0.73 -54.79% 0.44 -25%
TPCH-Q16 0.32 0.78 -58.97% 0.41 -21.95%
TPCH-Q17 0.56 0.84 -33.33% 0.89 -37.08%
TPCH-Q18 0.54 0.92 -41.3% 0.89 -39.33%
TPCH-Q19 0.35 2.34 -85.04% 0.35 0%
TPCH-Q20 0.34 0.98 -65.31% 0.31 9.68%
TPCH-Q21 0.83 1.14 -27.19% 0.86 -3.49%
TPCH-Q22 0.26 0.52 -50% 0.25 4%
From the result, it shows a pretty good performance compared to
codegen without cache (default setting). However, compared
to codegen disabled, as expected, for short queries, codegen
cache is not always faster, probably because for the codegen
cache, it still needs some time to prepare the codegen functions
and generate an appropriate module bitcode to be the key, if
the time of the preparation is larger than the benefit from
the codegen functions, especially for the extremely short queries,
the result can be slower than not using the codegen. There could
be room to improve in future.
We also test the total cache entry size for tpch queries. The data
below shows the total codegen cache used by each tpch query. We
can see the optimal mode is very helpful to reduce the size of
the cache, and the reason is the much smaller key in optimal mode
we mentioned before because the only difference between two modes
is the key.
Query Normal(KB) Optimal(KB)
TPCH-Q1 604.1 50.9
TPCH-Q2 973.4 135.5
TPCH-Q3 561.1 36.5
TPCH-Q4 423.3 41.1
TPCH-Q5 866.9 93.3
TPCH-Q6 295.9 4.9
TPCH-Q7 1105.4 124.5
TPCH-Q8 1382.6 211
TPCH-Q9 1041.4 119.5
TPCH-Q10 738.4 65.4
TPCH-Q11 1201.6 136.3
TPCH-Q12 452.8 46.7
TPCH-Q13 541.3 48.1
TPCH-Q14 696.8 102.8
TPCH-Q15 1148.1 95.2
TPCH-Q16 740.6 77.4
TPCH-Q17 990.1 133.4
TPCH-Q18 376 70.8
TPCH-Q19 1280.1 179.5
TPCH-Q20 1260.9 180.7
TPCH-Q21 722.5 66.8
TPCH-Q22 713.1 49.8
Tests:
Ran exhaustive tests.
Added E2e testcase TestCodegenCache.
Added unit testcase LlvmCodeGenCacheTest.
Change-Id: If42c78a7f51fd582e5fe331fead494dadf544eb1
Reviewed-on: http://gerrit.cloudera.org:8080/19181
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/codegen/CMakeLists.txt | 2 +
be/src/codegen/llvm-codegen-cache-test.cc | 586 +++++++++++++++++++++
be/src/codegen/llvm-codegen-cache.cc | 235 +++++++++
be/src/codegen/llvm-codegen-cache.h | 298 +++++++++++
be/src/codegen/llvm-codegen.cc | 241 +++++++--
be/src/codegen/llvm-codegen.h | 48 +-
be/src/exprs/scalar-expr.cc | 22 +-
be/src/runtime/exec-env.cc | 26 +
be/src/runtime/exec-env.h | 6 +
be/src/runtime/fragment-state.h | 4 +
be/src/runtime/krpc-data-stream-sender-ir.cc | 2 +-
be/src/runtime/krpc-data-stream-sender.cc | 18 +-
be/src/runtime/krpc-data-stream-sender.h | 2 +-
be/src/runtime/query-state.cc | 6 +
be/src/runtime/query-state.h | 5 +
be/src/runtime/test-env.h | 9 +-
be/src/service/query-options.cc | 11 +
be/src/service/query-options.h | 9 +-
common/thrift/ImpalaService.thrift | 12 +
common/thrift/Query.thrift | 13 +
common/thrift/metrics.json | 60 +++
.../queries/QueryTest/codegen-cache-udf.test | 24 +
tests/common/test_result_verifier.py | 8 +
tests/custom_cluster/test_codegen_cache.py | 254 +++++++++
24 files changed, 1845 insertions(+), 56 deletions(-)
diff --git a/be/src/codegen/CMakeLists.txt b/be/src/codegen/CMakeLists.txt
index ac00a1c2b..6a01f39de 100644
--- a/be/src/codegen/CMakeLists.txt
+++ b/be/src/codegen/CMakeLists.txt
@@ -32,6 +32,7 @@ add_library(CodeGen
codegen-symbol-emitter.cc
codegen-util.cc
llvm-codegen.cc
+ llvm-codegen-cache.cc
instruction-counter.cc
${IR_C_FILE}
${LEGACY_AVX_IR_C_FILE}
@@ -130,5 +131,6 @@ add_custom_target(test-loop.bc
# Exception to unified be tests: custom main initializes LLVM
ADD_BE_LSAN_TEST(llvm-codegen-test)
add_dependencies(llvm-codegen-test test-loop.bc)
+ADD_BE_LSAN_TEST(llvm-codegen-cache-test LlvmCodegenCacheTest.*)
ADD_UNIFIED_BE_LSAN_TEST(instruction-counter-test InstructionCounterTest.*)
diff --git a/be/src/codegen/llvm-codegen-cache-test.cc
b/be/src/codegen/llvm-codegen-cache-test.cc
new file mode 100644
index 000000000..1146c3564
--- /dev/null
+++ b/be/src/codegen/llvm-codegen-cache-test.cc
@@ -0,0 +1,586 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "codegen/llvm-codegen-cache.h"
+#include <boost/thread/thread.hpp>
+#include <llvm/ExecutionEngine/ExecutionEngine.h>
+#include "codegen/mcjit-mem-mgr.h"
+#include "common/object-pool.h"
+#include "runtime/fragment-state.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
+
+using namespace std;
+using boost::scoped_ptr;
+using boost::thread;
+using boost::thread_group;
+
+DECLARE_bool(cache_force_single_shard);
+DECLARE_string(codegen_cache_capacity);
+
+namespace impala {
+class LlvmCodeGenCacheTest : public testing::Test {
+ public:
+ virtual void SetUp() {
+ FLAGS_codegen_cache_capacity = "0";
+ metrics_.reset(new MetricGroup("codegen-cache-test"));
+ profile_ = RuntimeProfile::Create(&obj_pool_, "codegen-cache-test");
+ test_env_.reset(new TestEnv);
+ ASSERT_OK(test_env_->Init());
+ RuntimeState* runtime_state_;
+ ASSERT_OK(test_env_->CreateQueryState(0, &query_options_,
&runtime_state_));
+ QueryState* qs = runtime_state_->query_state();
+ TPlanFragment* fragment = qs->obj_pool()->Add(new TPlanFragment());
+ PlanFragmentCtxPB* fragment_ctx = qs->obj_pool()->Add(new
PlanFragmentCtxPB());
+ fragment_state_ =
+ qs->obj_pool()->Add(new FragmentState(qs, *fragment, *fragment_ctx));
+ }
+
+ virtual void TearDown() {
+ fragment_state_->ReleaseResources();
+ fragment_state_ = nullptr;
+ codegen_cache_.reset();
+ test_env_.reset();
+ metrics_.reset();
+ obj_pool_.Clear();
+ }
+
+ void Reset() {
+ TearDown();
+ SetUp();
+ }
+
+ static void AddFunctionToJit(
+ LlvmCodeGen* codegen, llvm::Function* fn, CodegenFnPtrBase* fn_ptr) {
+ return codegen->AddFunctionToJitInternal(fn, fn_ptr);
+ }
+
+ static Status FinalizeModule(LlvmCodeGen* codegen) { return
codegen->FinalizeModule(); }
+
+ static void CheckResult(CodeGenCacheEntry& entry, bool is_double = false) {
+ ASSERT_TRUE(!entry.Empty());
+ ASSERT_TRUE(entry.engine_pointer != nullptr);
+ CheckResult(entry.engine_pointer, is_double);
+ }
+
+ static void CheckResult(LlvmCodeGen* codegen, bool is_double = false) {
+ ASSERT_TRUE(codegen->execution_engine_cached_ != nullptr);
+ CheckResult(codegen->execution_engine_cached_.get(), is_double);
+ }
+
+ static void CheckResult(llvm::ExecutionEngine* engine, bool is_double =
false) {
+ void* test_fn;
+ if (!is_double) {
+ test_fn = reinterpret_cast<void*>(engine->getFunctionAddress("Echo"));
+ } else {
+ test_fn = reinterpret_cast<void*>(engine->getFunctionAddress("Double"));
+ }
+ ASSERT_TRUE(test_fn != nullptr);
+ int input = 1;
+ if (!is_double) {
+ EXPECT_EQ(((TestEcho)test_fn)(input), input);
+ } else {
+ EXPECT_EQ(((TestDouble)test_fn)(input), input * 2);
+ }
+ }
+
+ void AddLlvmCodegenEcho(LlvmCodeGen* codegen);
+ void AddLlvmCodegenDouble(LlvmCodeGen* codegen);
+ void GetLlvmEmptyFunction(LlvmCodeGen* codegen, llvm::Function**);
+ typedef int (*TestEcho)(int);
+ typedef int (*TestDouble)(int);
+ typedef void (*TestEmpty)();
+ void TestBasicFunction(TCodeGenCacheMode::type mode);
+ void TestAtCapacity(TCodeGenCacheMode::type mode);
+ void TestSkipCache();
+ void CheckMetrics(CodeGenCache*, int, int, int);
+ void CheckInUseMetrics(CodeGenCache*, int, int64_t);
+ void CheckEvictMetrics(CodeGenCache*, int);
+ int64_t GetMemCharge(LlvmCodeGen* codegen, string key_str, bool
is_normal_mode);
+ void CheckEngineCount(LlvmCodeGen*, int expect_count);
+ void CheckToInsertMap();
+ void TestSwitchModeHelper(TCodeGenCacheMode::type mode, string key,
+ int expect_entry_num, int expect_engine_num, llvm::ExecutionEngine**
engine);
+ bool CheckKeyExist(TCodeGenCacheMode::type mode, string key);
+ bool CheckEngineExist(llvm::ExecutionEngine* engine);
+ void ExpectNumEngineSameAsEntry();
+ void StoreHelper(TCodeGenCacheMode::type mode, string key);
+ void TestConcurrentStore(int num_threads);
+
+ vector<TCodeGenCacheMode::type> all_modes = {TCodeGenCacheMode::OPTIMAL,
+ TCodeGenCacheMode::NORMAL, TCodeGenCacheMode::OPTIMAL_DEBUG,
+ TCodeGenCacheMode::NORMAL_DEBUG};
+
+ FragmentState* fragment_state_;
+ ObjectPool obj_pool_;
+ scoped_ptr<MetricGroup> metrics_;
+ RuntimeProfile* profile_;
+ scoped_ptr<TestEnv> test_env_;
+ scoped_ptr<CodeGenCache> codegen_cache_;
+ shared_ptr<llvm::ExecutionEngine> exec_engine_;
+ TQueryOptions query_options_;
+};
+
+void LlvmCodeGenCacheTest::AddLlvmCodegenEcho(LlvmCodeGen* codegen) {
+ ASSERT_TRUE(codegen != NULL);
+ LlvmCodeGen::FnPrototype prototype(codegen, "Echo", codegen->i32_type());
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("n", codegen->i32_type()));
+ LlvmBuilder builder(codegen->context());
+ llvm::Value* args[1];
+ llvm::Function* fn = prototype.GeneratePrototype(&builder, args);
+ builder.CreateRet(args[0]);
+ fn = codegen->FinalizeFunction(fn);
+ ASSERT_TRUE(fn != NULL);
+ CodegenFnPtr<TestEcho> jitted_fn;
+ AddFunctionToJit(codegen, fn, &jitted_fn);
+ ASSERT_OK(FinalizeModule(codegen));
+ ASSERT_TRUE(jitted_fn.load() != nullptr);
+ TestEcho test_fn = jitted_fn.load();
+ ASSERT_EQ(test_fn(1), 1);
+}
+
+void LlvmCodeGenCacheTest::GetLlvmEmptyFunction(
+ LlvmCodeGen* codegen, llvm::Function** func) {
+ ASSERT_TRUE(codegen != NULL);
+ LlvmCodeGen::FnPrototype prototype(codegen, "TestEmpty",
codegen->void_type());
+ LlvmBuilder builder(codegen->context());
+ llvm::Function* fn = prototype.GeneratePrototype(&builder, nullptr);
+ builder.CreateRetVoid();
+ fn = codegen->FinalizeFunction(fn);
+ ASSERT_TRUE(fn != NULL);
+ *func = fn;
+}
+
+void LlvmCodeGenCacheTest::AddLlvmCodegenDouble(LlvmCodeGen* codegen) {
+ ASSERT_TRUE(codegen != NULL);
+ LlvmCodeGen::FnPrototype prototype(codegen, "Double", codegen->i32_type());
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("n", codegen->i32_type()));
+ LlvmBuilder builder(codegen->context());
+ llvm::Value* args[1];
+ llvm::Function* fn = prototype.GeneratePrototype(&builder, args);
+ llvm::Value* mul = codegen->GetI32Constant(2);
+ args[0] = builder.CreateMul(args[0], mul);
+ builder.CreateRet(args[0]);
+ fn = codegen->FinalizeFunction(fn);
+ ASSERT_TRUE(fn != NULL);
+ CodegenFnPtr<TestDouble> jitted_fn;
+ AddFunctionToJit(codegen, fn, &jitted_fn);
+ ASSERT_OK(FinalizeModule(codegen));
+ ASSERT_TRUE(jitted_fn.load() != nullptr);
+ TestEcho test_fn = jitted_fn.load();
+ ASSERT_EQ(test_fn(1), 2);
+}
+
+/// Test the basic function of a codegen cache.
+void LlvmCodeGenCacheTest::TestBasicFunction(TCodeGenCacheMode::type mode) {
+ int64_t codegen_cache_capacity = 256 * 1024; // 256KB
+ codegen_cache_.reset(new CodeGenCache(metrics_.get()));
+ bool is_normal_mode = !CodeGenCacheModeAnalyzer::is_optimal(mode);
+ EXPECT_OK(codegen_cache_->Init(codegen_cache_capacity));
+
+ // Create a LlvmCodeGen containing a codegen function Echo.
+ scoped_ptr<LlvmCodeGen> codegen;
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test",
&codegen));
+ AddLlvmCodegenEcho(codegen.get());
+ // Create a LlvmCodeGen containing a codegen function Double.
+ scoped_ptr<LlvmCodeGen> codegen_double;
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(
+ fragment_state_, NULL, "test_double", &codegen_double));
+ AddLlvmCodegenDouble(codegen_double.get());
+
+ CodeGenCacheKey cache_key;
+ CodeGenCacheEntry entry;
+ string key = "key";
+ CodeGenCacheKeyConstructor::construct(key, &cache_key);
+ int64_t mem_charge = GetMemCharge(codegen.get(), cache_key.data(),
is_normal_mode);
+ int64_t mem_charge_double =
+ GetMemCharge(codegen_double.get(), cache_key.data(), is_normal_mode);
+
+ // Store and lookup the entry by the key.
+ EXPECT_OK(codegen_cache_->Store(cache_key, codegen.get(), mode));
+ CheckInUseMetrics(
+ codegen_cache_.get(), 1 /*num_entry_in_use*/, mem_charge
/*bytes_in_use*/);
+ EXPECT_OK(codegen_cache_->Lookup(cache_key, mode, &entry, &exec_engine_));
+ CheckResult(entry);
+ codegen->Close();
+ // Close the LlvmCodeGen, but should not affect the stored cache.
+ EXPECT_OK(codegen_cache_->Lookup(cache_key, mode, &entry, &exec_engine_));
+ CheckResult(entry);
+ // Override the entry with a different function, should be able to find the
new
+ // function from the new entry.
+ EXPECT_OK(codegen_cache_->Store(cache_key, codegen_double.get(), mode));
+ CheckInUseMetrics(
+ codegen_cache_.get(), 1 /*num_entry_in_use*/, mem_charge_double
/*bytes_in_use*/);
+ EXPECT_OK(codegen_cache_->Lookup(cache_key, mode, &entry, &exec_engine_));
+ CheckResult(entry, true /*is_double*/);
+ EXPECT_EQ(codegen_cache_->codegen_cache_entries_evicted_->GetValue(), 1);
+ codegen_double->Close();
+}
+
+void LlvmCodeGenCacheTest::CheckMetrics(
+ CodeGenCache* codegen_cache, int hit, int miss, int evict) {
+ EXPECT_EQ(codegen_cache->codegen_cache_hits_->GetValue(), hit);
+ EXPECT_EQ(codegen_cache->codegen_cache_misses_->GetValue(), miss);
+ EXPECT_EQ(codegen_cache->codegen_cache_entries_evicted_->GetValue(), evict);
+}
+
+void LlvmCodeGenCacheTest::CheckInUseMetrics(
+ CodeGenCache* codegen_cache, int num_entry, int64_t bytes = -1) {
+ EXPECT_EQ(codegen_cache->codegen_cache_entries_in_use_->GetValue(),
num_entry);
+ if (bytes != -1) {
+ EXPECT_EQ(codegen_cache->codegen_cache_entries_in_use_bytes_->GetValue(),
bytes);
+ }
+}
+
+void LlvmCodeGenCacheTest::CheckEvictMetrics(CodeGenCache* codegen_cache, int
evict) {
+ EXPECT_EQ(codegen_cache->codegen_cache_entries_evicted_->GetValue(), evict);
+}
+
+int64_t LlvmCodeGenCacheTest::GetMemCharge(
+ LlvmCodeGen* codegen, string key_str, bool is_normal_mode) {
+ if (is_normal_mode) {
+ return codegen->memory_manager_->bytes_allocated() + key_str.size()
+ + sizeof(CodeGenCacheEntry);
+ }
+ // Optimal mode would use hash code and length as the key.
+ return codegen->memory_manager_->bytes_allocated() +
CodeGenCacheKey::OptimalKeySize
+ + sizeof(CodeGenCacheEntry);
+}
+
+/// Test the situation that the codegen cache hits the limit of capacity, in
this case,
+/// eviction is needed when new insertion comes.
+void LlvmCodeGenCacheTest::TestAtCapacity(TCodeGenCacheMode::type mode) {
+ int64_t codegen_cache_capacity = 196; // 196B
+ bool is_normal_mode = !CodeGenCacheModeAnalyzer::is_optimal(mode);
+ // 128B for optimal mode
+ if (!is_normal_mode) codegen_cache_capacity = 128;
+ // Using single shard makes the logic of scenarios simple for capacity and
+ // eviction-related behavior.
+ FLAGS_cache_force_single_shard = true;
+
+ // Create two LlvmCodeGen objects containing a different codegen function
separately.
+ scoped_ptr<LlvmCodeGen> codegen;
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test",
&codegen));
+ AddLlvmCodegenEcho(codegen.get());
+ codegen->GenerateFunctionNamesHashCode();
+ scoped_ptr<LlvmCodeGen> codegen_double;
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(
+ fragment_state_, NULL, "test_double", &codegen_double));
+ AddLlvmCodegenDouble(codegen_double.get());
+ codegen_double->GenerateFunctionNamesHashCode();
+
+ CodeGenCacheKey cache_key_1;
+ CodeGenCacheKey cache_key_2;
+ string key_1 = "key1";
+ string key_2 = "key2";
+ CodeGenCacheKeyConstructor::construct(key_1, &cache_key_1);
+ CodeGenCacheKeyConstructor::construct(key_2, &cache_key_2);
+ int64_t mem_charge_1 = GetMemCharge(codegen.get(), cache_key_1.data(),
is_normal_mode);
+ int64_t mem_charge_2 = GetMemCharge(codegen.get(), cache_key_2.data(),
is_normal_mode);
+ // Make sure the memory charge of two keys is larger than capacity for
testing the
+ // eviction.
+ ASSERT_LE(mem_charge_1, codegen_cache_capacity);
+ ASSERT_LE(mem_charge_2, codegen_cache_capacity);
+ ASSERT_GE(mem_charge_1 + mem_charge_2, codegen_cache_capacity);
+
+ test_env_->ResetCodegenCache(metrics_.get());
+ CodeGenCache* cache = test_env_->codegen_cache();
+ EXPECT_OK(cache->Init(codegen_cache_capacity));
+
+ // Store key_1 and lookup.
+ EXPECT_OK(codegen->StoreCache(cache_key_1));
+ EXPECT_TRUE(codegen->LookupCache(cache_key_1));
+ CheckResult(codegen.get());
+ CheckMetrics(cache, 1 /*hit*/, 0 /*miss*/, 0 /*evict*/);
+
+ // Store key_2, key_1 should be evicted due to hitting the capaticy limit.
+ EXPECT_OK(codegen_double->StoreCache(cache_key_2));
+ CheckMetrics(cache, 1 /*hit*/, 0 /*miss*/, 1 /*evict*/);
+
+ // Lookup key_1, should be gone. Lookup key_2, should be successful.
+ EXPECT_FALSE(codegen->LookupCache(cache_key_1));
+ CheckMetrics(cache, 1 /*hit*/, 1 /*miss*/, 1 /*evict*/);
+ EXPECT_TRUE(codegen_double->LookupCache(cache_key_2));
+ CheckResult(codegen_double.get(), /*is_double*/ true);
+ CheckMetrics(cache, 2 /*hit*/, 1 /*miss*/, 1 /*evict*/);
+
+ // Store key_1 again, should evict the key_2, check again to see if
everything
+ // is alright.
+ EXPECT_OK(codegen->StoreCache(cache_key_1));
+ CheckMetrics(cache, 2 /*hit*/, 1 /*miss*/, 2 /*evict*/);
+ EXPECT_FALSE(codegen_double->LookupCache(cache_key_2));
+ CheckMetrics(cache, 2 /*hit*/, 2 /*miss*/, 2 /*evict*/);
+ EXPECT_TRUE(codegen->LookupCache(cache_key_1));
+ CheckResult(codegen.get());
+ CheckMetrics(cache, 3 /*hit*/, 2 /*miss*/, 2 /*evict*/);
+
+ codegen->Close();
+ codegen_double->Close();
+}
+
+/// Test the case if we have a cache but doesn't contain the function
+/// we want, should switch to the cache missing path.
+void LlvmCodeGenCacheTest::TestSkipCache() {
+ // Initial a LlvmCodeGen object with a normal function.
+ scoped_ptr<LlvmCodeGen> codegen;
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test",
&codegen));
+ AddLlvmCodegenEcho(codegen.get());
+ // Create an empty function from other LlvmCodeGen to create the failure
later.
+ scoped_ptr<LlvmCodeGen> codegen_empty;
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(
+ fragment_state_, NULL, "test_empty", &codegen_empty));
+ llvm::Function* empty_func;
+ GetLlvmEmptyFunction(codegen_empty.get(), &empty_func);
+
+ test_env_->ResetCodegenCache(metrics_.get());
+ EXPECT_OK(test_env_->codegen_cache()->Init(256 * 1024 /*capacity*/));
+
+ CheckMetrics(test_env_->codegen_cache(), 0 /*hit*/, 0 /*miss*/, 0 /*evict*/);
+ CodeGenCacheKey cache_key;
+ string key = "key";
+ CodeGenCacheKeyConstructor::construct(key, &cache_key);
+ codegen->GenerateFunctionNamesHashCode();
+ // Store and lookup the entry by the key, should be successful.
+ EXPECT_OK(codegen->StoreCache(cache_key));
+ EXPECT_TRUE(codegen->LookupCache(cache_key));
+ CheckMetrics(test_env_->codegen_cache(), 1 /*hit*/, 0 /*miss*/, 0 /*evict*/);
+ CodegenFnPtr<TestEmpty> fn_ptr;
+ // Insert a new function to the codegen, and regenerate the function names
hash
+ // code, expect a failure because the hash code inconsistency with the code
in
+ // the cache.
+ codegen->fns_to_jit_compile_.emplace_back(empty_func, &fn_ptr);
+ codegen->GenerateFunctionNamesHashCode();
+ // Expect a look up failure.
+ EXPECT_FALSE(codegen->LookupCache(cache_key));
+ CheckMetrics(test_env_->codegen_cache(), 1 /*hit*/, 1 /*miss*/, 0 /*evict*/);
+ codegen->Close();
+ codegen_empty->Close();
+}
+
+// Test the basic function of using the codegen cache.
+TEST_F(LlvmCodeGenCacheTest, BasicFunction) {
+ for (auto mode : all_modes) {
+ Reset();
+ TestBasicFunction(mode);
+ }
+}
+
+// Test when the codegen cache is at capacity.
+TEST_F(LlvmCodeGenCacheTest, EvictionAtCapacity) {
+ for (auto mode : all_modes) {
+ query_options_.codegen_cache_mode = mode;
+ Reset();
+ TestAtCapacity(query_options_.codegen_cache_mode);
+ }
+}
+
+// Test when the cache hits, but has different function names, in that case,
+// we will skip the cache and fall back to normal path.
+TEST_F(LlvmCodeGenCacheTest, SkipCache) {
+ for (auto mode : all_modes) {
+ query_options_.codegen_cache_mode = mode;
+ Reset();
+ TestSkipCache();
+ }
+}
+
+// Test whether the codegen cache mode analyzer produces the correct result
for all
+// the modes.
+TEST_F(LlvmCodeGenCacheTest, ModeAnalyzer) {
+ EXPECT_FALSE(CodeGenCacheModeAnalyzer::is_debug(TCodeGenCacheMode::OPTIMAL));
+ EXPECT_FALSE(CodeGenCacheModeAnalyzer::is_debug(TCodeGenCacheMode::NORMAL));
+
EXPECT_TRUE(CodeGenCacheModeAnalyzer::is_debug(TCodeGenCacheMode::OPTIMAL_DEBUG));
+
EXPECT_TRUE(CodeGenCacheModeAnalyzer::is_debug(TCodeGenCacheMode::NORMAL_DEBUG));
+
EXPECT_TRUE(CodeGenCacheModeAnalyzer::is_optimal(TCodeGenCacheMode::OPTIMAL));
+
EXPECT_FALSE(CodeGenCacheModeAnalyzer::is_optimal(TCodeGenCacheMode::NORMAL));
+
EXPECT_TRUE(CodeGenCacheModeAnalyzer::is_optimal(TCodeGenCacheMode::OPTIMAL_DEBUG));
+
EXPECT_FALSE(CodeGenCacheModeAnalyzer::is_optimal(TCodeGenCacheMode::NORMAL_DEBUG));
+}
+
+// Check the number of execution engine stored in the cache.
+// Because the shared pointer of the execution engine needs to be stored in
the codegen
+// cache while the entry using the execution engine is stored in the cache.
+void LlvmCodeGenCacheTest::CheckEngineCount(LlvmCodeGen* codegen, int
expect_count) {
+ lock_guard<mutex> lock(codegen_cache_->cached_engines_lock_);
+ auto engine_it =
codegen_cache_->cached_engines_.find(codegen->execution_engine_.get());
+ EXPECT_TRUE(engine_it != codegen_cache_->cached_engines_.end());
+ EXPECT_EQ(codegen_cache_->cached_engines_.size(), expect_count);
+}
+
+void LlvmCodeGenCacheTest::CheckToInsertMap() {
+ lock_guard<mutex> lock(codegen_cache_->to_insert_set_lock_);
+ EXPECT_EQ(codegen_cache_->keys_to_insert_.size(), 0);
+}
+
+// Return true if the provided key exists.
+bool LlvmCodeGenCacheTest::CheckKeyExist(TCodeGenCacheMode::type mode, string
key) {
+ CodeGenCacheKey cache_key;
+ CodeGenCacheEntry entry;
+ CodeGenCacheKeyConstructor::construct(key, &cache_key);
+ EXPECT_OK(codegen_cache_->Lookup(cache_key, mode, &entry, &exec_engine_));
+ return !entry.Empty();
+}
+
+// Return true if the provided engine exists.
+bool LlvmCodeGenCacheTest::CheckEngineExist(llvm::ExecutionEngine* engine) {
+ auto engine_it = codegen_cache_->cached_engines_.find(engine);
+ return engine_it != codegen_cache_->cached_engines_.end();
+}
+
+// Expect the number of execution engine is the same as the entry number in
the global
+// codegen cache.
+void LlvmCodeGenCacheTest::ExpectNumEngineSameAsEntry() {
+ EXPECT_EQ(codegen_cache_->cached_engines_.size(),
+ codegen_cache_->codegen_cache_entries_in_use_->GetValue());
+}
+
+/// Helper function to test swithing modes. Helps to insert an entry with
provided key
+/// and mode.
+void LlvmCodeGenCacheTest::TestSwitchModeHelper(TCodeGenCacheMode::type mode,
string key,
+ int expect_entry_num = -1, int expect_engine_num = -1,
+ llvm::ExecutionEngine** engine = nullptr) {
+ // Create a LlvmCodeGen containing a codegen function Echo.
+ scoped_ptr<LlvmCodeGen> codegen;
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test",
&codegen));
+ AddLlvmCodegenEcho(codegen.get());
+
+ CodeGenCacheKey cache_key;
+ CodeGenCacheEntry entry;
+ CodeGenCacheKeyConstructor::construct(key, &cache_key);
+
+ // Store and lookup the entry by the key.
+ EXPECT_OK(codegen_cache_->Store(cache_key, codegen.get(), mode));
+ if (expect_entry_num != -1) {
+ CheckInUseMetrics(codegen_cache_.get(), expect_entry_num
/*num_entry_in_use*/);
+ }
+ if (expect_engine_num != -1) {
+ CheckEngineCount(codegen.get(), expect_engine_num);
+ }
+ EXPECT_OK(codegen_cache_->Lookup(cache_key, mode, &entry, &exec_engine_));
+ CheckResult(entry);
+ if (engine) *engine = codegen->execution_engine_.get();
+ codegen->Close();
+}
+
+// Test to switch among different modes.
+TEST_F(LlvmCodeGenCacheTest, SwitchMode) {
+ int64_t codegen_cache_capacity = 512; // 512B
+ codegen_cache_.reset(new CodeGenCache(metrics_.get()));
+ EXPECT_OK(codegen_cache_->Init(codegen_cache_capacity));
+ string key = "key";
+ // Insert one entry to the cache with the key provided in each
TestSwitchModeHelper().
+ // The key of debug and non-debug are the same for the same mode, therefore,
+ // we expect the entry number and engine number would not be changed between
the
+ // switch of debug and non-debug modes. But the key would be different
between
+ // NORMAL and OPTIMAL.
+ TestSwitchModeHelper(TCodeGenCacheMode::OPTIMAL, key, 1, 1);
+ TestSwitchModeHelper(TCodeGenCacheMode::OPTIMAL_DEBUG, key, 1, 1);
+ TestSwitchModeHelper(TCodeGenCacheMode::NORMAL, key, 2, 2);
+ TestSwitchModeHelper(TCodeGenCacheMode::NORMAL_DEBUG, key, 2, 2);
+ // Try again, the new insertion should replace the old ones, so the entry
number and
+ // engine number won't change.
+ llvm::ExecutionEngine *engine_opt, *engine_opt_dbg, *engine_normal,
*engine_normal_dbg;
+ TestSwitchModeHelper(TCodeGenCacheMode::OPTIMAL, key, 2, 2, &engine_opt);
+ TestSwitchModeHelper(TCodeGenCacheMode::OPTIMAL_DEBUG, key, 2, 2,
&engine_opt_dbg);
+ // Expect the engines with the same key should be different, because the
engine is
+ // created every time.
+ EXPECT_NE(engine_opt, engine_opt_dbg);
+ // Search the engine, the later one should exist, while the early one not.
+ EXPECT_FALSE(CheckEngineExist(engine_opt));
+ EXPECT_TRUE(CheckEngineExist(engine_opt_dbg));
+
+ // Same as above, but use the NORMAL type.
+ TestSwitchModeHelper(TCodeGenCacheMode::NORMAL, key, 2, 2, &engine_normal);
+ TestSwitchModeHelper(TCodeGenCacheMode::NORMAL_DEBUG, key, 2, 2,
&engine_normal_dbg);
+ EXPECT_NE(engine_normal, engine_normal_dbg);
+ EXPECT_FALSE(CheckEngineExist(engine_normal));
+ EXPECT_TRUE(CheckEngineExist(engine_normal_dbg));
+
+ // Expect the two existing engines are not the same.
+ EXPECT_NE(engine_opt_dbg, engine_normal_dbg);
+ // Expect the number of existing engines are the same as the entries.
+ ExpectNumEngineSameAsEntry();
+ // Insert lots of different keys, so that to evict the original key due to
reaching
+ // capacity.
+ for (int i = 0; i < 10; i++) {
+ TestSwitchModeHelper(TCodeGenCacheMode::NORMAL, key + std::to_string(i));
+ }
+ // As the entries with originla key are evicted, expect we can't find them
in the
+ // codegen cache anymore.
+ EXPECT_FALSE(CheckKeyExist(TCodeGenCacheMode::OPTIMAL, key));
+ EXPECT_FALSE(CheckKeyExist(TCodeGenCacheMode::OPTIMAL_DEBUG, key));
+ EXPECT_FALSE(CheckKeyExist(TCodeGenCacheMode::NORMAL, key));
+ EXPECT_FALSE(CheckKeyExist(TCodeGenCacheMode::NORMAL_DEBUG, key));
+ ExpectNumEngineSameAsEntry();
+}
+
+/// Helper function to store a specific key to the global codegen cache.
+void LlvmCodeGenCacheTest::StoreHelper(TCodeGenCacheMode::type mode, string
key) {
+ // Create a LlvmCodeGen containing a codegen function Echo.
+ scoped_ptr<LlvmCodeGen> codegen;
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test",
&codegen));
+ AddLlvmCodegenEcho(codegen.get());
+ CodeGenCacheKey cache_key;
+ CodeGenCacheEntry entry;
+ CodeGenCacheKeyConstructor::construct(key, &cache_key);
+ EXPECT_OK(codegen_cache_->Store(cache_key, codegen.get(), mode));
+ codegen->Close();
+}
+
+/// Concurrently store random entries to the global codegen cache, and check
if all
+/// the resources alright.
+void LlvmCodeGenCacheTest::TestConcurrentStore(int num_threads) {
+ thread_group workers;
+ for (int i = 0; i < num_threads; ++i) {
+ workers.add_thread(new thread([this, num_threads]() {
+ int test_times = 100;
+ while (test_times-- > 0) {
+ string key = std::to_string(rand() % num_threads);
+ int mode_idx = rand() % all_modes.size();
+ StoreHelper(all_modes[mode_idx], key);
+ }
+ }));
+ }
+ workers.join_all();
+
+ // Check the metrics and number of elements in the global cache to make sure
there will
+ // be no leaking.
+ EXPECT_LE(codegen_cache_->codegen_cache_entries_in_use_->GetValue(),
num_threads);
+ EXPECT_GT(codegen_cache_->codegen_cache_entries_evicted_->GetValue(), 0);
+ {
+ lock_guard<mutex> lock(codegen_cache_->cached_engines_lock_);
+ EXPECT_LE(codegen_cache_->cached_engines_.size(), num_threads);
+ }
+ CheckToInsertMap();
+}
+
+TEST_F(LlvmCodeGenCacheTest, ConcurrentStore) {
+ int64_t codegen_cache_capacity = 512; // 512B
+ codegen_cache_.reset(new CodeGenCache(metrics_.get()));
+ EXPECT_OK(codegen_cache_->Init(codegen_cache_capacity));
+ TestConcurrentStore(8);
+}
+
+} // namespace impala
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+ impala::InitFeSupport(false);
+ ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm());
+ return RUN_ALL_TESTS();
+}
diff --git a/be/src/codegen/llvm-codegen-cache.cc
b/be/src/codegen/llvm-codegen-cache.cc
new file mode 100644
index 000000000..bf1690699
--- /dev/null
+++ b/be/src/codegen/llvm-codegen-cache.cc
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "codegen/llvm-codegen-cache.h"
+#include "codegen/instruction-counter.h"
+#include "codegen/mcjit-mem-mgr.h"
+#include "util/hash-util.h"
+
+using namespace std;
+using strings::Substitute;
+
+namespace impala {
+
+// Maximum codegen cache entry size for the purposes of histogram sizing in
stats
+// collection.
+// An entries is expected to be less than 128MB.
+static constexpr int64_t STATS_MAX_CODEGEN_CACHE_ENTRY_SIZE = 128L << 20;
+
+void CodeGenCache::EvictionCallback::EvictedEntry(Slice key, Slice value) {
+ DCHECK(key.data() != nullptr);
+ DCHECK(value.data() != nullptr);
+ // Remove the execution engine of this entry from the global cache.
+ const CodeGenCacheEntry* cache_entry =
+ reinterpret_cast<const CodeGenCacheEntry*>(value.data());
+ DCHECK(cache_entry != nullptr);
+ cache_->RemoveEngine(cache_entry->engine_pointer);
+ LOG(INFO) << "Evict CodeGen Cache, key hash_code=" <<
CodeGenCacheKey::hash_code(key);
+ // For statistics.
+ if (codegen_cache_entries_evicted_ != nullptr) {
+ codegen_cache_entries_evicted_->Increment(1);
+ }
+ if (codegen_cache_entries_in_use_ != nullptr) {
+ codegen_cache_entries_in_use_->Increment(-1);
+ }
+ if (codegen_cache_entries_in_use_bytes_ != nullptr) {
+
codegen_cache_entries_in_use_bytes_->Increment(-cache_entry->total_bytes_charge);
+ }
+}
+
+void CodeGenCacheKeyConstructor::construct(
+ string& key_content, CodeGenCacheKey* cache_key) {
+ DCHECK(cache_key != nullptr);
+ int length = 0;
+ int len_hashcode = sizeof(CodeGenCacheKey::HashCode);
+ // We use the same type for all the length of elements within the key.
+ int len_general_size = sizeof(CodeGenCacheKey::ElementLengthType);
+ int len_key_content = key_content.size();
+ CodeGenCacheKey::HashCode hashcode;
+ length += len_hashcode; // For hash value
+ length += len_general_size << 1;
+ length += len_key_content;
+ string result;
+ result.reserve(length);
+ result.append(reinterpret_cast<char*>(&hashcode), len_hashcode);
+ result.append(reinterpret_cast<char*>(&length), len_general_size);
+ result.append(reinterpret_cast<char*>(&len_key_content), len_general_size);
+ result.append(key_content.c_str(), len_key_content);
+ DCHECK_GT(length, len_hashcode);
+ MurmurHash3_x64_128(result.c_str() + len_hashcode, length - len_hashcode,
+ CODEGEN_CACHE_HASH_SEED_CONST, hashcode.hash_code);
+ result.replace(0, len_hashcode, reinterpret_cast<char*>(&hashcode),
len_hashcode);
+ cache_key->set_key(move(result));
+}
+
+CodeGenCache::CodeGenCache(MetricGroup* metrics)
+ : is_closed_(false),
+ codegen_cache_hits_(metrics->AddCounter("impala.codegen-cache.hits", 0)),
+ codegen_cache_misses_(metrics->AddCounter("impala.codegen-cache.misses",
0)),
+ codegen_cache_entries_evicted_(
+ metrics->AddCounter("impala.codegen-cache.entries-evicted", 0)),
+ codegen_cache_entries_in_use_(
+ metrics->AddGauge("impala.codegen-cache.entries-in-use", 0)),
+ codegen_cache_entries_in_use_bytes_(
+ metrics->AddGauge("impala.codegen-cache.entries-in-use-bytes", 0)),
+ codegen_cache_entry_size_stats_(metrics->RegisterMetric(
+ new
HistogramMetric(MetricDefs::Get("impala.codegen-cache.entry-sizes"),
+ STATS_MAX_CODEGEN_CACHE_ENTRY_SIZE, 3))),
+ evict_callback_(
+ new CodeGenCache::EvictionCallback(this,
codegen_cache_entries_evicted_,
+ codegen_cache_entries_in_use_,
codegen_cache_entries_in_use_bytes_)) {}
+
+Status CodeGenCache::Init(int64_t capacity) {
+ DCHECK(cache_ == nullptr);
+ cache_.reset(NewCache(Cache::EvictionPolicy::LRU, capacity,
"CodeGen_Cache"));
+ return cache_->Init();
+}
+
+void CodeGenCache::ReleaseResources() {
+ if (cache_ != nullptr) cache_.reset();
+}
+
+Status CodeGenCache::Lookup(const CodeGenCacheKey& cache_key,
+ const TCodeGenCacheMode::type& mode, CodeGenCacheEntry* entry,
+ shared_ptr<llvm::ExecutionEngine>* execution_engine) {
+ DCHECK(!is_closed_);
+ DCHECK(cache_ != nullptr);
+ DCHECK(entry != nullptr);
+ DCHECK(execution_engine != nullptr);
+ // Use hash code and the total length as the key for optimal mode, because
the whole
+ // key could be very large, using optimal mode could improve the performance
and save
+ // memory consumption. However, it could lead to a collision, even though
the chance
+ // is very small, in that case, we may switch to normal mode for the query
or disable
+ // the codegen cache.
+ Slice key = cache_key.optimal_key_slice();
+ if (!CodeGenCacheModeAnalyzer::is_optimal(mode)) {
+ key = cache_key.data_slice();
+ }
+ Cache::UniqueHandle handle(cache_->Lookup(key));
+ if (handle.get() != nullptr) {
+ const CodeGenCacheEntry* cached_entry =
+ reinterpret_cast<const
CodeGenCacheEntry*>(cache_->Value(handle).data());
+ // Need to find the shared pointer of the engine from the cache before
return,
+ // because the shared pointer could be deleted in the eviction process.
+ // If can't find it, treat it as cache missing, because the engine is
needed
+ // to look for jitted functions.
+ if (LookupEngine(cached_entry->engine_pointer, execution_engine)) {
+ entry->Reset(cached_entry->engine_pointer, cached_entry->num_functions,
+ cached_entry->num_instructions,
cached_entry->function_names_hashcode,
+ cached_entry->total_bytes_charge);
+ return Status::OK();
+ }
+ }
+ entry->Reset();
+ return Status::OK();
+}
+
+Status CodeGenCache::StoreInternal(const CodeGenCacheKey& cache_key,
+ const LlvmCodeGen* codegen, const TCodeGenCacheMode::type& mode) {
+ // In normal mode, we will store the whole key content to the cache.
+ // Otherwise, in optimal mode, we will only store the hash code and length
of the key.
+ Slice key = cache_key.optimal_key_slice();
+ if (!CodeGenCacheModeAnalyzer::is_optimal(mode)) {
+ key = cache_key.data_slice();
+ }
+ // Memory charge includes both key and entry size.
+ int64_t mem_charge = codegen->memory_manager_->bytes_allocated() + key.size()
+ + sizeof(CodeGenCacheEntry);
+ Cache::UniquePendingHandle pending_handle(
+ cache_->Allocate(key, sizeof(CodeGenCacheEntry), mem_charge));
+ if (pending_handle == nullptr) {
+ return Status(Substitute("Couldn't allocate handle for codegen cache
entry,"
+ " size: '$0'",
+ mem_charge));
+ }
+ CodeGenCacheEntry* cache_entry =
+
reinterpret_cast<CodeGenCacheEntry*>(cache_->MutableValue(&pending_handle));
+ cache_entry->Reset(codegen->execution_engine_.get(),
codegen->num_functions_->value(),
+ codegen->num_instructions_->value(), codegen->function_names_hashcode_,
mem_charge);
+ StoreEngine(codegen);
+ /// It is thread-safe, but could override the existing entry with the same
key.
+ Cache::UniqueHandle cache_handle =
+ cache_->Insert(move(pending_handle), evict_callback_.get());
+ if (cache_handle == nullptr) {
+ RemoveEngine(codegen->execution_engine_.get());
+ return Status(Substitute("Couldn't insert codegen cache entry,"
+ " hash code:'$0', size: '$1'",
+ cache_key.hash_code().str(), mem_charge));
+ }
+ codegen_cache_entries_in_use_->Increment(1);
+ codegen_cache_entries_in_use_bytes_->Increment(mem_charge);
+ codegen_cache_entry_size_stats_->Update(mem_charge);
+ return Status::OK();
+}
+
+Status CodeGenCache::Store(const CodeGenCacheKey& cache_key, const
LlvmCodeGen* codegen,
+ const TCodeGenCacheMode::type& mode) {
+ DCHECK(!is_closed_);
+ DCHECK(cache_ != nullptr);
+ DCHECK(codegen != nullptr);
+ Status status = Status::OK();
+ pair<unordered_set<CodeGenCacheKey::HashCode>::iterator, bool>
key_to_insert_it;
+ {
+ // The Cache::Insert() suggests the caller to avoid multiple handles for
the same
+ // key to insert because it can be inefficient to keep the entry from
being evicted.
+ // So use the keys_to_insert_ set to reduce the chance of multiple
insertion of the
+ // same cache entry.
+ // Before insertion, we will try to insert the hash code of the key to the
set, if
+ // succeeds, the thread is allowed to do the insertion, and is responsible
to erase
+ // the hash code in the set when insertion finishes.
+ // Otherwise, if the thread fails to insert the hash code, that means one
other thread
+ // is doing the insertion on the same key, in that case, the current
thread would give
+ // up the task and return with an okay status, because we assume the other
thread
+ // would get the cache entry in. Even that thread fails, the system won't
hurt without
+ // only one cache entry, and the cache entry can be inserted again next
time.
+ lock_guard<mutex> lock(to_insert_set_lock_);
+ key_to_insert_it = keys_to_insert_.insert(cache_key.hash_code());
+ // If hash code exists, return an okay immediately.
+ if (!key_to_insert_it.second) return status;
+ }
+ // Do store the cache entry to the cache.
+ status = StoreInternal(cache_key, codegen, mode);
+ // Remove the hash code of the key from the to_insert_keys set.
+ lock_guard<mutex> lock(to_insert_set_lock_);
+ keys_to_insert_.erase(key_to_insert_it.first);
+ return status;
+}
+
+void CodeGenCache::StoreEngine(const LlvmCodeGen* codegen) {
+ DCHECK(codegen != nullptr);
+ lock_guard<mutex> lock(cached_engines_lock_);
+ cached_engines_.emplace(codegen->execution_engine_.get(),
codegen->execution_engine_);
+}
+
+bool CodeGenCache::LookupEngine(const llvm::ExecutionEngine* engine,
+ shared_ptr<llvm::ExecutionEngine>* execution_engine) {
+ DCHECK(engine != nullptr);
+ DCHECK(execution_engine != nullptr);
+ lock_guard<mutex> lock(cached_engines_lock_);
+ auto engine_it = cached_engines_.find(engine);
+ if (engine_it == cached_engines_.end()) return false;
+ *execution_engine = engine_it->second;
+ return true;
+}
+
+void CodeGenCache::RemoveEngine(const llvm::ExecutionEngine* engine) {
+ DCHECK(engine != nullptr);
+ lock_guard<mutex> lock(cached_engines_lock_);
+ cached_engines_.erase(engine);
+}
+
+} // namespace impala
diff --git a/be/src/codegen/llvm-codegen-cache.h
b/be/src/codegen/llvm-codegen-cache.h
new file mode 100644
index 000000000..9242118ba
--- /dev/null
+++ b/be/src/codegen/llvm-codegen-cache.h
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "codegen/llvm-codegen.h"
+
+#include <unistd.h>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "common/status.h"
+#include "kudu/util/slice.h"
+#include "thirdparty/datasketches/MurmurHash3.h"
+#include "util/cache/cache.h"
+#include "util/histogram-metric.h"
+#include "util/metrics.h"
+
+namespace impala {
+
+/// The key to a CodeGen Cache entry.
+/// It contains a hash code at the first of sizeof(struct HashCode) bytes. If
the
+/// cache is in NORMAL mode, the content of the key will be after the hash
code,
+/// which could be a combination of several keys, otherwise, in OPTIMAL mode,
the
+/// CodeGenCacheKey to be stored to the cache would only contain a hash code
and
+/// the total length to reduce the memory consumption.
+/// For a key in the NORMAL mode, it would be like:
+/// Hash Code| Total Length| Length of Key 1| Data of Key 1| Length of Key 2
+/// | Data of Key 2| ... | Length of Key N| Data of Key N.
+/// The key is constructed by CodeGenCacheKeyConstructor.
+/// Functions are NOT thread-safe.
+class CodeGenCacheKey {
+ public:
+ /// The type of a hashcode for the codegen cache key.
+ struct HashCode {
+ HashCode() { memset(&hash_code, 0, sizeof(HashState)); }
+ // This function is used by unordered_set to compare elements of HashCode.
+ bool operator==(const HashCode& h) const {
+ return (this->hash_code.h1 == h.hash_code.h1)
+ && (this->hash_code.h2 == h.hash_code.h2);
+ }
+ friend std::ostream& operator<<(std::ostream& o, const HashCode& h) {
+ return o << std::hex << std::setfill('0') << std::setw(16) <<
h.hash_code.h1 << ":"
+ << std::setw(16) << h.hash_code.h2;
+ }
+ string str() {
+ std::stringstream out;
+ out << *this;
+ return out.str();
+ }
+ HashState hash_code;
+ };
+
+ /// Used as the hash function in unordered_set for struct HashCode.
+ /// It is required by the unordered_set to generate a hash code for a
+ /// customized structure.
+ struct HashCodeHash {
+ uint64_t operator()(const HashCode& h) const {
+ // Return the first 64bits for the hash.
+ return h.hash_code.h1;
+ }
+ };
+
+ /// The type of the length of a general element in the codegen cache key.
+ typedef int ElementLengthType;
+
+ /// Construct an empty key.
+ CodeGenCacheKey() {}
+
+ /// Construct the key from a string. Uses move to fasten the process.
+ CodeGenCacheKey(string key) : key_(std::move(key)) {}
+
+ /// Set the data of the key. Uses swap to fasten the process.
+ void set_key(string key) { key_.swap(key); }
+
+ /// Return the hash code of the key.
+ HashCode hash_code() const {
+ DCHECK(!key_.empty());
+ DCHECK(key_.length() >= sizeof(HashCode));
+ return *(const HashCode*)key_.c_str();
+ }
+
+ /// Return the slice of the hash code of the key.
+ Slice hash_code_slice() const {
+ DCHECK_LE(sizeof(HashCode), key_.size());
+ return Slice(key_.c_str(), sizeof(HashCode));
+ }
+
+ /// Return the size of a key in optimal mode.
+ static constexpr size_t OptimalKeySize = sizeof(HashCode) +
sizeof(ElementLengthType);
+
+ /// Return the slice of key in optimal mode, which is the hash code plus the
total
+ /// length of a full key.
+ Slice optimal_key_slice() const {
+ DCHECK_LE(OptimalKeySize, key_.size());
+ return Slice(key_.c_str(), OptimalKeySize);
+ }
+
+ /// Return the hash code from a key of type Slice.
+ static HashCode hash_code(Slice key) {
+ DCHECK(!key.empty());
+ DCHECK_LE(sizeof(HashCode), key.size());
+ return *(const HashCode*)key.data();
+ ;
+ }
+
+ /// Return the data of the key.
+ string& data() { return key_; }
+ Slice data_slice() const { return Slice(key_.c_str(), key_.size()); }
+
+ /// Return if the key is empty.
+ bool empty() const { return key_.empty(); }
+
+ private:
+ /// The data of the key.
+ std::string key_;
+};
+
+/// The class helps to construct a CodeGenCacheKey.
+class CodeGenCacheKeyConstructor {
+ public:
+ /// Construct a CodeGenCacheKey from a string.
+ /// It could be in future to construct from a list of string keys.
+ static void construct(std::string&, CodeGenCacheKey*);
+ /// An arbitrary constant used to seed the hash.
+ static constexpr uint64_t CODEGEN_CACHE_HASH_SEED_CONST = 0x6b8b4567327b23c6;
+};
+
+/// The class helps to analyze the codegen cache mode.
+class CodeGenCacheModeAnalyzer {
+ public:
+ // The mask should follow the definition of TCodeGenCacheMode.
+ // The lowest byte of the TCodeGenCacheMode is used to define the type NORMAL
+ // or OPTIMAL.
+ // The nineth bit (bit 8) is used to define whether it is a debug mode. If
it is 1,
+ // then it is a debug mode.
+ static const uint64_t CODEGEN_CACHE_MODE_MASK = 0xFF;
+ static const uint64_t CODEGEN_CACHE_MODE_BITS = 8;
+ static bool is_optimal(const TCodeGenCacheMode::type& mode) {
+ return (mode & CODEGEN_CACHE_MODE_MASK) == TCodeGenCacheMode::OPTIMAL;
+ }
+ static bool is_debug(const TCodeGenCacheMode::type& mode) {
+ return (mode >> CODEGEN_CACHE_MODE_BITS) != 0;
+ }
+};
+
+struct CodeGenCacheEntry {
+ CodeGenCacheEntry() { Init(); }
+ bool Empty() { return engine_pointer == nullptr; }
+ void Init() { memset((uint8_t*)this, 0, sizeof(CodeGenCacheEntry)); }
+ void Reset() { Reset(nullptr, 0, 0, 0, 0); }
+ void Reset(llvm::ExecutionEngine* engine_ptr, int64_t num_funcs, int64_t
num_instrucs,
+ uint64_t names_hashcode, int64_t charge) {
+ engine_pointer = engine_ptr;
+ num_functions = num_funcs;
+ num_instructions = num_instrucs;
+ function_names_hashcode = names_hashcode;
+ total_bytes_charge = charge;
+ }
+ llvm::ExecutionEngine* engine_pointer;
+ int64_t num_functions;
+ int64_t num_instructions;
+ /// The hashcode of function names in the entry.
+ uint64_t function_names_hashcode;
+ /// Bytes charge including the key and the entry.
+ int64_t total_bytes_charge;
+};
+
+/// Each CodeGenCache is supposed to be a singleton in the daemon, manages the
codegen
+/// cache entries, including providing the function of entry storage and look
up, and
+/// entry eviction if capacity limit is met.
+class CodeGenCache {
+ public:
+ CodeGenCache(MetricGroup*);
+ ~CodeGenCache() {
+ is_closed_ = true;
+ ReleaseResources();
+ }
+
+ /// Initilization for the codegen cache, including cache and metrics
allocation.
+ Status Init(int64_t capacity);
+
+ /// Release the resources that occupied by the codegen cache before
destruction.
+ void ReleaseResources();
+
+ /// Lookup the specific cache key for the cache entry. If the key doesn't
exist, the
+ /// entry will be reset to empty.
+ /// Return Status::Okay unless there is any internal error to throw.
+ Status Lookup(const CodeGenCacheKey& key, const TCodeGenCacheMode::type&
mode,
+ CodeGenCacheEntry* entry, std::shared_ptr<llvm::ExecutionEngine>*
execution_engine);
+
+ /// Store the cache entry with the specific cache key.
+ Status Store(const CodeGenCacheKey& key, const LlvmCodeGen* codegen,
+ const TCodeGenCacheMode::type& mode);
+
+ /// Store the shared pointer of llvm execution engine to the cache to keep
all the
+ /// jitted functions in that engine alive.
+ void StoreEngine(const LlvmCodeGen* codegen);
+
+ /// Look up the shared pointer of llvm execution engine by its raw pointer
address.
+ /// If found, return true. Ohterwise, return false.
+ bool LookupEngine(const llvm::ExecutionEngine* engine,
+ std::shared_ptr<llvm::ExecutionEngine>* execution_engine);
+
+ /// Remove the shared pointer of llvm execution engine from the cache by its
raw
+ /// pointer address.
+ void RemoveEngine(const llvm::ExecutionEngine* engine);
+
+ /// Increment a hit count or miss count.
+ void IncHitOrMissCount(bool hit) {
+ DCHECK(codegen_cache_hits_ && codegen_cache_misses_);
+ if (hit) {
+ codegen_cache_hits_->Increment(1);
+ } else {
+ codegen_cache_misses_->Increment(1);
+ }
+ }
+
+ /// EvictionCallback for the codegen cache.
+ class EvictionCallback : public Cache::EvictionCallback {
+ public:
+ EvictionCallback(CodeGenCache* cache, IntCounter* entries_evicted,
+ IntGauge* entries_in_use, IntGauge* entries_in_use_bytes)
+ : cache_(cache),
+ codegen_cache_entries_evicted_(entries_evicted),
+ codegen_cache_entries_in_use_(entries_in_use),
+ codegen_cache_entries_in_use_bytes_(entries_in_use_bytes) {}
+ virtual void EvictedEntry(Slice key, Slice value) override;
+
+ private:
+ CodeGenCache* cache_;
+ /// Metrics for the codegen cache in the process level.
+ IntCounter* codegen_cache_entries_evicted_;
+ IntGauge* codegen_cache_entries_in_use_;
+ IntGauge* codegen_cache_entries_in_use_bytes_;
+ };
+
+ private:
+ friend class LlvmCodeGenCacheTest;
+
+ /// Helper function to store the entry to the cache.
+ Status StoreInternal(const CodeGenCacheKey& key, const LlvmCodeGen* codegen,
+ const TCodeGenCacheMode::type& mode);
+
+ /// Indicate if the cache is closed. If is closed, no call is allowed for
any functions
+ /// other than ReleaseResources().
+ bool is_closed_;
+
+ /// The instance of the cache.
+ std::unique_ptr<Cache> cache_;
+
+ /// Metrics for the codegen cache in the process level.
+ IntCounter* codegen_cache_hits_;
+ IntCounter* codegen_cache_misses_;
+ IntCounter* codegen_cache_entries_evicted_;
+ IntGauge* codegen_cache_entries_in_use_;
+ IntGauge* codegen_cache_entries_in_use_bytes_;
+
+ /// Statistics for the buffer sizes allocated from the system allocator.
+ HistogramMetric* codegen_cache_entry_size_stats_;
+
+ /// Eviction Callback function.
+ std::unique_ptr<CodeGenCache::EvictionCallback> evict_callback_;
+
+ /// Protects to the to insert hash set below.
+ std::mutex to_insert_set_lock_;
+
+ /// The keys of codegen entries that are going to insert to the system cache.
+ /// The purpose of it is to prevent the same key simultaneously to be
inserted.
+ std::unordered_set<CodeGenCacheKey::HashCode, CodeGenCacheKey::HashCodeHash>
+ keys_to_insert_;
+
+ /// Protects to the map of cached engines.
+ std::mutex cached_engines_lock_;
+
+ /// Stores the llvm execution engine shared pointer to keep the jitted
functions alive.
+ /// The shared pointer entry could be removed when cache entry evicted or
the whole
+ /// cache destructed.
+ std::unordered_map<const llvm::ExecutionEngine*,
std::shared_ptr<llvm::ExecutionEngine>>
+ cached_engines_;
+};
+} // namespace impala
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 94b2e57af..27205580f 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -16,12 +16,12 @@
// under the License.
#include "codegen/llvm-codegen.h"
+#include "codegen/llvm-codegen-cache.h"
#include <fstream>
+#include <mutex>
#include <sstream>
#include <unordered_set>
-
-#include <mutex>
#include <boost/algorithm/string.hpp>
#include <boost/assert/source_location.hpp>
#include <gutil/strings/substitute.h>
@@ -31,6 +31,7 @@
#include <llvm/Analysis/Passes.h>
#include <llvm/Analysis/TargetTransformInfo.h>
#include <llvm/Bitcode/BitcodeReader.h>
+#include <llvm/Bitcode/BitcodeWriter.h>
#include <llvm/ExecutionEngine/ExecutionEngine.h>
#include <llvm/ExecutionEngine/MCJIT.h>
#include <llvm/IR/Constants.h>
@@ -65,18 +66,19 @@
#include "codegen/mcjit-mem-mgr.h"
#include "common/logging.h"
#include "exprs/anyval-util.h"
+#include "gutil/sysinfo.h"
#include "impala-ir/impala-ir-names.h"
#include "runtime/collection-value.h"
#include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/lib-cache.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
-#include "runtime/fragment-state.h"
#include "runtime/runtime-state.h"
#include "runtime/string-value.h"
#include "runtime/timestamp-value.h"
-#include "gutil/sysinfo.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
#include "util/hdfs-util.h"
@@ -85,6 +87,7 @@
#include "util/symbols-util.h"
#include "util/test-info.h"
#include "util/thread.h"
+#include "util/thrift-debug-util.h"
#include "common/names.h"
@@ -226,6 +229,9 @@ LlvmCodeGen::LlvmCodeGen(FragmentState* state, ObjectPool*
pool,
context_->setDiagnosticHandler(&DiagnosticHandler::DiagnosticHandlerFn,
this);
load_module_timer_ = ADD_TIMER(profile_, "LoadTime");
prepare_module_timer_ = ADD_TIMER(profile_, "PrepareTime");
+ codegen_cache_lookup_timer_ = ADD_TIMER(profile_, "CodegenCacheLookupTime");
+ codegen_cache_save_timer_ = ADD_TIMER(profile_, "CodegenCacheSaveTime");
+ module_bitcode_gen_timer_ = ADD_TIMER(profile_, "ModuleBitcodeGenTime");
module_bitcode_size_ = ADD_COUNTER(profile_, "ModuleBitcodeSize",
TUnit::BYTES);
ir_generation_timer_ = ADD_TIMER(profile_, "IrGenerationTime");
optimization_timer_ = ADD_TIMER(profile_, "OptimizationTime");
@@ -235,6 +241,7 @@ LlvmCodeGen::LlvmCodeGen(FragmentState* state, ObjectPool*
pool,
ASYNC_CODEGEN_THREAD_COUNTERS_PREFIX);
num_functions_ = ADD_COUNTER(profile_, "NumFunctions", TUnit::UNIT);
num_instructions_ = ADD_COUNTER(profile_, "NumInstructions", TUnit::UNIT);
+ num_cached_functions_ = ADD_COUNTER(profile_, "NumCachedFunctions",
TUnit::UNIT);
llvm_thread_counters_ = ADD_THREAD_COUNTERS(profile_, "Codegen");
}
@@ -513,6 +520,7 @@ void LlvmCodeGen::Close() {
// Execution engine executes callback on event listener, so tear down engine
first.
execution_engine_.reset();
+ execution_engine_cached_.reset();
symbol_emitter_.reset();
module_ = nullptr;
}
@@ -914,6 +922,18 @@ Status LlvmCodeGen::LoadFunction(const TFunction& fn,
const string& symbol,
// Associate the dynamically loaded function pointer with the Function* we
defined.
// This tells LLVM where the compiled function definition is located in
memory.
execution_engine_->addGlobalMapping(*llvm_fn, fn_ptr);
+ // Disable the codegen cache because codegen cache uses the llvm module
bitcode as
+ // the key while the bitcode doesn't contain the global function mapping
of the
+ // execution engine. If the mapping is changed during running, like udf
recreation,
+ // the function mapping in the cache could point to an old address and
lead to a
+ // crash while calling the udf, so block the codegen cache for native
udfs.
+ // Builtin functions should not have the issue, because they should not
change
+ // during runtime.
+ if (fn.binary_type == TFunctionBinaryType::NATIVE) {
+ // Should be before compilation.
+ DCHECK(!is_compiled_);
+ codegen_cache_enabled_ = false;
+ }
} else if (fn.binary_type == TFunctionBinaryType::BUILTIN) {
// In this path, we're running a builtin with the UDF interface. The IR is
// in the llvm module. Builtin functions may use Expr::GetConstant().
Clone the
@@ -1112,6 +1132,123 @@ Status LlvmCodeGen::FinalizeLazyMaterialization() {
return MaterializeModule();
}
+bool LlvmCodeGen::LookupCache(CodeGenCacheKey& cache_key) {
+ DCHECK(!cache_key.empty());
+ CodeGenCacheEntry entry;
+ CodeGenCache* cache = ExecEnv::GetInstance()->codegen_cache();
+ DCHECK(cache != nullptr);
+ Status lookup_status = cache->Lookup(cache_key,
+ state_->query_options().codegen_cache_mode, &entry,
&execution_engine_cached_);
+ bool entry_exist = lookup_status.ok() && !entry.Empty();
+ LOG(INFO) << DebugCacheEntryString(cache_key, true /*is_lookup*/,
+
CodeGenCacheModeAnalyzer::is_debug(state_->query_options().codegen_cache_mode),
+ entry_exist);
+ if (entry_exist) {
+ // Fallback to normal procedure if function names hashcode is not expected.
+ // The names hashcode should be the same unless there is a collision on the
+ // key, we expect this case is very rare.
+ if (function_names_hashcode_ != entry.function_names_hashcode) {
+ LOG(WARNING)
+ << "The codegen cache entry contains a different function names
hashcode: "
+ << " function names hashcode expected: " << function_names_hashcode_
+ << " actual: " << entry.function_names_hashcode
+ << " key hash_code=" << cache_key.hash_code();
+ cache->IncHitOrMissCount(/*hit*/ false);
+ return false;
+ }
+
+ // execution_engine_cached_ is used to keep the life of the jitted
functions in
+ // case the engine is evicted in the global cache.
+ DCHECK(execution_engine_cached_ != nullptr);
+ vector<void*> jitted_funcs;
+ // Get pointers to all codegen'd functions.
+ for (int i = 0; i < fns_to_jit_compile_.size(); ++i) {
+ llvm::Function* function = fns_to_jit_compile_[i].first;
+ DCHECK(function != nullptr);
+ // Using the function getFunctionAddress() with a non-existent function
name
+ // would hit an assertion during the test, could be a bug in llvm 5,
need to
+ // review after upgrade llvm. But because we already checked the names
hashcode
+ // for key collision cases, we expect all the functions should be in the
+ // cached execution engine.
+ void* jitted_function = reinterpret_cast<void*>(
+ execution_engine_cached_->getFunctionAddress(function->getName()));
+ if (jitted_function == nullptr) {
+ LOG(WARNING) << "Failed to get a jitted function from cache: "
+ << function->getName().data()
+ << " key hash_code=" << cache_key.hash_code();
+ cache->IncHitOrMissCount(/*hit*/ false);
+ return false;
+ }
+ jitted_funcs.emplace_back(jitted_function);
+ }
+ DCHECK_EQ(jitted_funcs.size(), fns_to_jit_compile_.size());
+ for (int i = 0; i < jitted_funcs.size(); i++) {
+ fns_to_jit_compile_[i].second->store(jitted_funcs[i]);
+ }
+
+ // Because we cache the entire execution engine, the cached number of
functions should
+ // be the same as the total function number.
+ COUNTER_SET(num_cached_functions_, entry.num_functions);
+ COUNTER_SET(num_functions_, entry.num_functions);
+ COUNTER_SET(num_instructions_, entry.num_instructions);
+ }
+ cache->IncHitOrMissCount(/*hit*/ entry_exist);
+ return entry_exist;
+}
+
+string LlvmCodeGen::GetAllFunctionNames() {
+ stringstream result;
+ // The way to concat would be like "function1,function2".
+ const char separator = ',';
+ for (int i = 0; i < fns_to_jit_compile_.size(); ++i) {
+ llvm::Function* function = fns_to_jit_compile_[i].first;
+ DCHECK(function != nullptr);
+ result << function->getName().data() << separator;
+ }
+ return result.str();
+}
+
+void LlvmCodeGen::GenerateFunctionNamesHashCode() {
+ string function_names = GetAllFunctionNames();
+ // Use the same hash seed as the codegen cache key.
+ function_names_hashcode_ = HashUtil::MurmurHash2_64(function_names.c_str(),
+ function_names.length(),
CodeGenCacheKeyConstructor::CODEGEN_CACHE_HASH_SEED_CONST);
+}
+
+Status LlvmCodeGen::StoreCache(CodeGenCacheKey& cache_key) {
+ DCHECK(!cache_key.empty());
+ Status store_status = ExecEnv::GetInstance()->codegen_cache()->Store(
+ cache_key, this, state_->query_options().codegen_cache_mode);
+ LOG(INFO) << DebugCacheEntryString(cache_key, false /*is_lookup*/,
+
CodeGenCacheModeAnalyzer::is_debug(state_->query_options().codegen_cache_mode));
+ return store_status;
+}
+
+void LlvmCodeGen::PruneModule() {
+ SCOPED_TIMER(optimization_timer_);
+ llvm::TargetIRAnalysis target_analysis =
+ execution_engine_->getTargetMachine()->getTargetIRAnalysis();
+
+ // Before running any other optimization passes, run the internalize pass,
giving it
+ // the names of all functions registered by AddFunctionToJit(), followed by
the
+ // global dead code elimination pass. This causes all functions not
registered to be
+ // JIT'd to be marked as internal, and any internal functions that are not
used are
+ // deleted by DCE pass. This greatly decreases compile time by removing
unused code.
+ unordered_set<string> exported_fn_names;
+ for (auto& entry : fns_to_jit_compile_) {
+ exported_fn_names.insert(entry.first->getName().str());
+ }
+ unique_ptr<llvm::legacy::PassManager> module_pass_manager(
+ new llvm::legacy::PassManager());
+
module_pass_manager->add(llvm::createTargetTransformInfoWrapperPass(target_analysis));
+ module_pass_manager->add(
+ llvm::createInternalizePass([&exported_fn_names](const
llvm::GlobalValue& gv) {
+ return exported_fn_names.find(gv.getName().str()) !=
exported_fn_names.end();
+ }));
+ module_pass_manager->add(llvm::createGlobalDCEPass());
+ module_pass_manager->run(*module_);
+}
+
Status LlvmCodeGen::FinalizeModule() {
DCHECK(!is_compiled_);
is_compiled_ = true;
@@ -1157,6 +1294,27 @@ Status LlvmCodeGen::FinalizeModule() {
}
RETURN_IF_ERROR(FinalizeLazyMaterialization());
+ PruneModule();
+
+ bool codegen_cache_enabled = state_->codegen_cache_enabled() &&
codegen_cache_enabled_;
+ CodeGenCacheKey cache_key;
+ if (codegen_cache_enabled) {
+ string bitcode;
+ SCOPED_TIMER(codegen_cache_lookup_timer_);
+ {
+ SCOPED_TIMER(module_bitcode_gen_timer_);
+ llvm::raw_string_ostream bitcode_stream(bitcode);
+ llvm::WriteBitcodeToFile(module_, bitcode_stream);
+ bitcode_stream.flush();
+ }
+ CodeGenCacheKeyConstructor::construct(bitcode, &cache_key);
+ // Generate the function names hashcode no matter the look up result, will
be used
+ // in the cache store process if look up failed.
+ GenerateFunctionNamesHashCode();
+ DCHECK(!cache_key.empty());
+ if (LookupCache(cache_key)) return Status::OK();
+ }
+
if (optimizations_enabled_ && !FLAGS_disable_optimization_passes) {
RETURN_IF_ERROR(OptimizeModule());
}
@@ -1179,16 +1337,26 @@ Status LlvmCodeGen::FinalizeModule() {
}
SetFunctionPointers();
- DestroyModule();
-
- // Track the memory consumed by the compiled code.
- int64_t bytes_allocated = memory_manager_->bytes_allocated();
- if (!mem_tracker_->TryConsume(bytes_allocated)) {
- const string& msg = Substitute(
- "Failed to allocate '$0' bytes for compiled code module",
bytes_allocated);
- return mem_tracker_->MemLimitExceeded(NULL, msg, bytes_allocated);
+ Status store_cache_status;
+ if (codegen_cache_enabled) {
+ SCOPED_TIMER(codegen_cache_save_timer_);
+ store_cache_status = StoreCache(cache_key);
+ LOG(INFO) << DebugCacheEntryString(cache_key, false /*is_lookup*/,
+
CodeGenCacheModeAnalyzer::is_debug(state_->query_options().codegen_cache_mode));
}
- memory_manager_->set_bytes_tracked(bytes_allocated);
+ // If the codegen is stored to the cache successfully, the cache will be
responsible to
+ // track the memory consumption instead.
+ if (!codegen_cache_enabled || !store_cache_status.ok()) {
+ // Track the memory consumed by the compiled code.
+ int64_t bytes_allocated = memory_manager_->bytes_allocated();
+ if (!mem_tracker_->TryConsume(bytes_allocated)) {
+ const string& msg = Substitute(
+ "Failed to allocate '$0' bytes for compiled code module",
bytes_allocated);
+ return mem_tracker_->MemLimitExceeded(NULL, msg, bytes_allocated);
+ }
+ memory_manager_->set_bytes_tracked(bytes_allocated);
+ }
+ DestroyModule();
return Status::OK();
}
@@ -1242,25 +1410,8 @@ Status LlvmCodeGen::OptimizeModule() {
// machine to optimisation passes, e.g. the cost model.
llvm::TargetIRAnalysis target_analysis =
execution_engine_->getTargetMachine()->getTargetIRAnalysis();
-
- // Before running any other optimization passes, run the internalize pass,
giving it
- // the names of all functions registered by AddFunctionToJit(), followed by
the
- // global dead code elimination pass. This causes all functions not
registered to be
- // JIT'd to be marked as internal, and any internal functions that are not
used are
- // deleted by DCE pass. This greatly decreases compile time by removing
unused code.
- unordered_set<string> exported_fn_names;
- for (auto& entry : fns_to_jit_compile_) {
- exported_fn_names.insert(entry.first->getName().str());
- }
unique_ptr<llvm::legacy::PassManager> module_pass_manager(
new llvm::legacy::PassManager());
-
module_pass_manager->add(llvm::createTargetTransformInfoWrapperPass(target_analysis));
- module_pass_manager->add(
- llvm::createInternalizePass([&exported_fn_names](const
llvm::GlobalValue& gv) {
- return exported_fn_names.find(gv.getName().str()) !=
exported_fn_names.end();
- }));
- module_pass_manager->add(llvm::createGlobalDCEPass());
- module_pass_manager->run(*module_);
// Update counters before final optimization, but after removing unused
functions. This
// gives us a rough measure of how much work the optimization and
compilation must do.
@@ -1834,6 +1985,36 @@ string LlvmCodeGen::DiagnosticHandler::GetErrorString() {
}
return "";
}
+
+string LlvmCodeGen::DebugCacheEntryString(
+ CodeGenCacheKey& key, bool is_lookup, bool debug_mode, bool success) const
{
+ stringstream out;
+ if (is_lookup) {
+ out << "Look up codegen cache ";
+ } else {
+ out << "Store to codegen cache ";
+ }
+ if (success) {
+ out << "succeeded. ";
+ } else {
+ if (is_lookup) {
+ out << "missed. ";
+ } else {
+ out << "failed. ";
+ }
+ }
+ out << "CodeGen Cache Key hash_code=" << key.hash_code();
+ out << " query_id=" << PrintId(state_->query_id()) << "\n";
+ if (UNLIKELY(debug_mode)) {
+ out << "Fragment Plan: " <<
apache::thrift::ThriftDebugString(state_->fragment())
+ << "\n";
+ out << "CodeGen Functions: \n";
+ for (int i = 0; i < fns_to_jit_compile_.size(); ++i) {
+ out << " " << fns_to_jit_compile_[i].first->getName().data() << "\n";
+ }
+ }
+ return out.str();
+}
}
namespace boost {
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 3e10e0e37..edc2665f4 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -91,6 +91,7 @@ class ImpalaMCJITMemoryManager;
class SubExprElimination;
class Thread;
class TupleDescriptor;
+class CodeGenCacheKey;
/// Define builder subclass in case we want to change the template arguments
later
class LlvmBuilder : public llvm::IRBuilder<> {
@@ -189,6 +190,9 @@ class LlvmCodeGen {
/// Turns on/off optimization passes
void EnableOptimizations(bool enable);
+ std::string DebugCacheEntryString(
+ CodeGenCacheKey& key, bool is_lookup, bool debug_mode, bool success =
true) const;
+
/// For debugging. Returns the IR that was generated. If full_module, the
/// entire module is dumped, including what was loaded from precompiled IR.
/// If false, only output IR for functions which were handcrafted.
@@ -428,6 +432,9 @@ class LlvmCodeGen {
/// be deleted in FinalizeModule(), otherwise, it returns the function
object.
llvm::Function* FinalizeFunction(llvm::Function* function);
+ /// Prunes any unused functions from the module.
+ void PruneModule();
+
/// Adds the function to be automatically jit compiled when the codegen
object is
/// finalized. FinalizeModule() will set *fn_ptr to point to the jitted
function.
///
@@ -629,6 +636,8 @@ class LlvmCodeGen {
friend class LlvmCodeGenTest_CpuAttrWhitelist_Test;
friend class LlvmCodeGenTest_HashTest_Test;
friend class SubExprElimination;
+ friend class CodeGenCache;
+ friend class LlvmCodeGenCacheTest;
/// Top level codegen object. 'module_id' is used for debugging when
outputting the IR.
LlvmCodeGen(FragmentState* state, ObjectPool* pool, MemTracker*
parent_mem_tracker,
@@ -703,6 +712,12 @@ class LlvmCodeGen {
// Used for testing.
void ResetVerification() { is_corrupt_ = false; }
+ // Lookup the codegen functions from the cache to reduce optimization time.
+ bool LookupCache(CodeGenCacheKey& key);
+
+ // Store the codegen functions to the cache if codegen cache is enabled.
+ Status StoreCache(CodeGenCacheKey& key);
+
/// Optimizes the module. This includes pruning the module of any unused
functions.
Status OptimizeModule();
@@ -747,6 +762,16 @@ class LlvmCodeGen {
/// generated is retained by the execution engine.
void DestroyModule();
+ /// Generate a string containing all jitted function names from
fns_to_jit_compile_.
+ /// The generation of the string is simply the concatenation of all the
names by
+ /// sequence in fns_to_jit_compile_. Would be used in the codegen cache
lookup to
+ /// confirm whether a cache entry matches the need of the LlvmCodeGen.
+ std::string GetAllFunctionNames();
+
+ /// Generate and store the hash code of all the function names. Will be used
to
+ /// codegen cache only.
+ void GenerateFunctionNamesHashCode();
+
/// Disable CPU attributes in 'cpu_attrs' that are not present in
/// the '--llvm_cpu_attr_whitelist' flag. The same attributes in the input
are
/// always present in the output, except "+" is flipped to "-" for the
disabled
@@ -800,6 +825,13 @@ class LlvmCodeGen {
/// Time spent creating the initial module with the cross-compiled Impala IR.
RuntimeProfile::Counter* prepare_module_timer_;
+ /// Time spent generating module bitcode.
+ RuntimeProfile::Counter* module_bitcode_gen_timer_;
+
+ /// Time spent for codegen cache look up and save.
+ RuntimeProfile::Counter* codegen_cache_lookup_timer_;
+ RuntimeProfile::Counter* codegen_cache_save_timer_;
+
/// Time spent by ExecNodes while adding IR to the module. Update by
/// FragmentInstanceState during its 'CODEGEN_START' state.
RuntimeProfile::Counter* ir_generation_timer_;
@@ -824,6 +856,9 @@ class LlvmCodeGen {
RuntimeProfile::Counter* num_functions_;
RuntimeProfile::Counter* num_instructions_;
+ /// Number of functions that are used and cached.
+ RuntimeProfile::Counter* num_cached_functions_;
+
/// Aggregated llvm thread counters. Also includes the phase represented by
/// 'ir_generation_timer_' and hence is also updated by
FragmentInstanceState.
RuntimeProfile::ThreadCounters* llvm_thread_counters_;
@@ -833,6 +868,9 @@ class LlvmCodeGen {
/// whether or not optimizations are enabled
bool optimizations_enabled_;
+ /// Whether or not codegen cache is enabled.
+ bool codegen_cache_enabled_ = true;
+
/// If true, the module is corrupt and we cannot codegen this query.
/// TODO: we could consider just removing the offending function and
attempting to
/// codegen the rest of the query. This requires more testing though to
make sure
@@ -855,7 +893,10 @@ class LlvmCodeGen {
llvm::Module* module_;
/// Execution/Jitting engine.
- std::unique_ptr<llvm::ExecutionEngine> execution_engine_;
+ std::shared_ptr<llvm::ExecutionEngine> execution_engine_;
+
+ /// Cached Execution/Jitting engine.
+ std::shared_ptr<llvm::ExecutionEngine> execution_engine_cached_;
/// The memory manager used by 'execution_engine_'. Owned by
'execution_engine_'.
ImpalaMCJITMemoryManager* memory_manager_;
@@ -892,6 +933,11 @@ class LlvmCodeGen {
/// The vector of functions to automatically JIT compile after
FinalizeModule().
std::vector<std::pair<llvm::Function*, CodegenFnPtrBase*>>
fns_to_jit_compile_;
+ /// The hash code generated from all the function names in
fns_to_jit_compile_.
+ /// Used by the codegen cache only.
+ uint64_t function_names_hashcode_;
+
+ /// The symbol emitted associated with 'execution_engine_'. Methods on
/// llvm representation of a few common types. Owned by context.
llvm::PointerType* ptr_type_; // int8_t*
llvm::Type* void_type_; // void
diff --git a/be/src/exprs/scalar-expr.cc b/be/src/exprs/scalar-expr.cc
index cf5a2872d..86a1689e0 100644
--- a/be/src/exprs/scalar-expr.cc
+++ b/be/src/exprs/scalar-expr.cc
@@ -142,13 +142,13 @@ Status ScalarExpr::CreateNode(
case TExprNodeType::TIMESTAMP_LITERAL:
case TExprNodeType::DATE_LITERAL:
*expr = pool->Add(new Literal(texpr_node));
- return Status::OK();
+ break;
case TExprNodeType::CASE_EXPR:
if (!texpr_node.__isset.case_expr) {
return Status("Case expression not set in thrift node");
}
*expr = pool->Add(new CaseExpr(texpr_node));
- return Status::OK();
+ break;
case TExprNodeType::COMPOUND_PRED:
if (texpr_node.fn.name.function_name == "and") {
*expr = pool->Add(new AndPredicate(texpr_node));
@@ -158,19 +158,19 @@ Status ScalarExpr::CreateNode(
DCHECK_EQ(texpr_node.fn.name.function_name, "not");
*expr = pool->Add(new ScalarFnCall(texpr_node));
}
- return Status::OK();
+ break;
case TExprNodeType::NULL_LITERAL:
*expr = pool->Add(new NullLiteral(texpr_node));
- return Status::OK();
+ break;
case TExprNodeType::SLOT_REF:
if (!texpr_node.__isset.slot_ref) {
return Status("Slot reference not set in thrift node");
}
*expr = pool->Add(new SlotRef(texpr_node));
- return Status::OK();
+ break;
case TExprNodeType::TUPLE_IS_NULL_PRED:
*expr = pool->Add(new TupleIsNullPredicate(texpr_node));
- return Status::OK();
+ break;
case TExprNodeType::FUNCTION_CALL:
if (!texpr_node.__isset.fn) {
return Status("Function not set in thrift node");
@@ -193,22 +193,24 @@ Status ScalarExpr::CreateNode(
} else {
*expr = pool->Add(new ScalarFnCall(texpr_node));
}
- return Status::OK();
+ break;
case TExprNodeType::IS_NOT_EMPTY_PRED:
*expr = pool->Add(new IsNotEmptyPredicate(texpr_node));
- return Status::OK();
+ break;
case TExprNodeType::KUDU_PARTITION_EXPR:
*expr = pool->Add(new KuduPartitionExpr(texpr_node));
- return Status::OK();
+ break;
case TExprNodeType::VALID_TUPLE_ID_EXPR:
*expr = pool->Add(new ValidTupleIdExpr(texpr_node));
- return Status::OK();
+ break;
default:
*expr = nullptr;
stringstream os;
os << "Unknown expr node type: " << texpr_node.node_type;
return Status(os.str());
}
+ DCHECK(*expr != nullptr);
+ return Status::OK();
}
Status ScalarExpr::OpenEvaluator(FunctionContext::FunctionStateScope scope,
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 34820d450..5b0a95742 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -24,6 +24,7 @@
#include <gutil/strings/substitute.h>
#include "catalog/catalog-service-client-wrapper.h"
+#include "codegen/llvm-codegen-cache.h"
#include "common/logging.h"
#include "common/object-pool.h"
#include "exec/kudu/kudu-util.h"
@@ -99,6 +100,8 @@ DEFINE_int32(admission_control_slots, 0,
"this backend. The degree of parallelism of the query determines the
number of slots "
"that it needs. Defaults to number of cores / -num_cores for executors,
and 8x that "
"value for dedicated coordinators).");
+DEFINE_string(codegen_cache_capacity, "1GB",
+ "Specify the capacity of the codegen cache. If set to 0, codegen cache is
disabled.");
DEFINE_bool(use_local_catalog, false,
"Use the on-demand metadata feature in coordinators. If this is set,
coordinators "
@@ -165,6 +168,10 @@ DEFINE_string(metrics_webserver_interface, "",
const static string DEFAULT_FS = "fs.defaultFS";
+// The max percentage that the codegen cache can take from the total process
memory.
+// The value is set to 20%.
+const double MAX_CODEGEN_CACHE_MEM_PERCENT = 0.2;
+
// The multiplier for how many queries a dedicated coordinator can run
compared to an
// executor. This is only effective when using non-default settings for
executor groups
// and the absolute value can be overridden by the '--admission_control_slots'
flag.
@@ -443,6 +450,25 @@ Status ExecEnv::Init() {
RETURN_IF_ERROR(admission_controller_->Init());
RETURN_IF_ERROR(InitHadoopConfig());
+ int64_t codegen_cache_capacity =
+ ParseUtil::ParseMemSpec(FLAGS_codegen_cache_capacity, &is_percent, 0);
+ if (codegen_cache_capacity > 0) {
+ int64_t codegen_cache_limit = mem_tracker_->limit() *
MAX_CODEGEN_CACHE_MEM_PERCENT;
+ DCHECK(codegen_cache_limit > 0);
+ if (codegen_cache_capacity > codegen_cache_limit) {
+ LOG(INFO) << "CodeGen Cache capacity changed to "
+ << PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES)
<< " from "
+ << PrettyPrinter::Print(codegen_cache_limit, TUnit::BYTES)
+ << " due to reaching the limit.";
+ codegen_cache_capacity = codegen_cache_limit;
+ }
+ codegen_cache_.reset(new CodeGenCache(metrics_.get()));
+ RETURN_IF_ERROR(codegen_cache_->Init(codegen_cache_capacity));
+ LOG(INFO) << "CodeGen Cache initialized with capacity "
+ << PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES);
+ } else {
+ LOG(INFO) << "CodeGen Cache is disabled.";
+ }
return Status::OK();
}
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 31d90169f..bf710a12e 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -69,6 +69,7 @@ class SystemStateInfo;
class ThreadResourceMgr;
class TmpFileMgr;
class Webserver;
+class CodeGenCache;
namespace io {
class DiskIoMgr;
@@ -151,6 +152,8 @@ class ExecEnv {
Scheduler* scheduler() { return scheduler_.get(); }
AdmissionController* admission_controller() { return
admission_controller_.get(); }
StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
+ CodeGenCache* codegen_cache() const { return codegen_cache_.get(); }
+ bool codegen_cache_enabled() const { return codegen_cache_ != nullptr; }
const TNetworkAddress& configured_backend_address() const {
return configured_backend_address_;
@@ -232,6 +235,9 @@ class ExecEnv {
/// Tracks system resource usage which we then include in profiles.
boost::scoped_ptr<SystemStateInfo> system_state_info_;
+ /// Singleton cache for codegen functions.
+ boost::scoped_ptr<CodeGenCache> codegen_cache_;
+
/// Not owned by this class
ImpalaServer* impala_server_ = nullptr;
MetricGroup* rpc_metrics_ = nullptr;
diff --git a/be/src/runtime/fragment-state.h b/be/src/runtime/fragment-state.h
index fa6a66bc2..f78b2d10e 100644
--- a/be/src/runtime/fragment-state.h
+++ b/be/src/runtime/fragment-state.h
@@ -60,6 +60,7 @@ class FragmentState {
ObjectPool* obj_pool() { return &obj_pool_; }
int fragment_idx() const { return fragment_.idx; }
const TQueryOptions& query_options() const { return
query_state_->query_options(); }
+ const TQueryCtx& query_ctx() const { return query_state_->query_ctx(); }
const TPlanFragment& fragment() const { return fragment_; }
const PlanFragmentCtxPB& fragment_ctx() const { return fragment_ctx_; }
const std::vector<const TPlanFragmentInstanceCtx*>& instance_ctxs() const {
@@ -68,6 +69,8 @@ class FragmentState {
const std::vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs()
const {
return instance_ctx_pbs_;
}
+ /// Return whether the codegen cache is enabled. It relies on the setting of
the query.
+ bool codegen_cache_enabled() const { return
query_state_->codegen_cache_enabled(); }
/// Return the minimum per-fragment index of an instance on this backend.
int min_per_fragment_instance_idx() const {
// 'instance_ctxs_' is in ascending order, so can just return the first
one.
@@ -79,6 +82,7 @@ class FragmentState {
const TUniqueId& query_id() const { return query_state_->query_id(); }
const DescriptorTbl& desc_tbl() const { return query_state_->desc_tbl(); }
MemTracker* query_mem_tracker() const { return
query_state_->query_mem_tracker(); }
+ QueryState* query_state() const { return query_state_; }
RuntimeProfile* runtime_profile() { return runtime_profile_; }
static const std::string FSTATE_THREAD_GROUP_NAME;
diff --git a/be/src/runtime/krpc-data-stream-sender-ir.cc
b/be/src/runtime/krpc-data-stream-sender-ir.cc
index 335e16f9e..d0da802d9 100644
--- a/be/src/runtime/krpc-data-stream-sender-ir.cc
+++ b/be/src/runtime/krpc-data-stream-sender-ir.cc
@@ -35,7 +35,7 @@ Status KrpcDataStreamSender::HashAndAddRows(RowBatch* batch) {
int row_count = 0;
FOREACH_ROW_LIMIT(batch, row_idx, RowBatch::HASH_BATCH_SIZE,
row_batch_iter) {
TupleRow* row = row_batch_iter.Get();
- channel_ids[row_count++] = HashRow(row) % num_channels;
+ channel_ids[row_count++] = HashRow(row, exchange_hash_seed_) %
num_channels;
}
row_count = 0;
FOREACH_ROW_LIMIT(batch, row_idx, RowBatch::HASH_BATCH_SIZE,
row_batch_iter) {
diff --git a/be/src/runtime/krpc-data-stream-sender.cc
b/be/src/runtime/krpc-data-stream-sender.cc
index d17c8e46d..bfab9366a 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -72,7 +72,7 @@ DECLARE_int32(rpc_retry_interval_ms);
namespace impala {
const char* KrpcDataStreamSender::HASH_ROW_SYMBOL =
- "KrpcDataStreamSender7HashRowEPNS_8TupleRowE";
+ "KrpcDataStreamSender7HashRowEPNS_8TupleRowEm";
const char* KrpcDataStreamSender::LLVM_CLASS_NAME =
"class.impala::KrpcDataStreamSender";
const char* KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER = "TotalBytesSent";
@@ -807,7 +807,8 @@ Status KrpcDataStreamSender::Open(RuntimeState* state) {
// An example of generated code with int type.
//
// define i64
@KrpcDataStreamSenderHashRow(%"class.impala::KrpcDataStreamSender"* %this,
-// %"class.impala::TupleRow"* %row)
#46 {
+// %"class.impala::TupleRow"* %row,
+// i64 %seed) #52 {
// entry:
// %0 = alloca i32
// %1 = call %"class.impala::ScalarExprEvaluator"*
@@ -833,7 +834,7 @@ Status KrpcDataStreamSender::Open(RuntimeState* state) {
// %hash_val = call i64
// @_ZN6impala8RawValue20GetHashValueFastHashEPKvRKNS_10ColumnTypeEm(
// i8* %val_ptr_phi, %"struct.impala::ColumnType"* @expr_type_arg,
-// i64 7403188670037225271)
+// i64 %seed)
// ret i64 %hash_val
// }
Status KrpcDataStreamSenderConfig::CodegenHashRow(
@@ -847,15 +848,15 @@ Status KrpcDataStreamSenderConfig::CodegenHashRow(
"this",
codegen->GetNamedPtrType(KrpcDataStreamSender::LLVM_CLASS_NAME)));
prototype.AddArgument(
LlvmCodeGen::NamedVariable("row",
codegen->GetStructPtrType<TupleRow>()));
+ prototype.AddArgument(LlvmCodeGen::NamedVariable("seed",
codegen->i64_type()));
- llvm::Value* args[2];
+ llvm::Value* args[3];
llvm::Function* hash_row_fn = prototype.GeneratePrototype(&builder, args);
llvm::Value* this_arg = args[0];
llvm::Value* row_arg = args[1];
// Store the initial seed to hash_val
- llvm::Value* hash_val =
- codegen->GetI64Constant(exchange_hash_seed_);
+ llvm::Value* hash_val = args[2];
// Unroll the loop and codegen each of the partition expressions
for (int i = 0; i < partition_exprs_.size(); ++i) {
@@ -922,7 +923,6 @@ Status KrpcDataStreamSenderConfig::CodegenHashRow(
if (*fn == nullptr) {
return Status("Codegen'd KrpcDataStreamSenderHashRow() fails verification.
See log");
}
-
return Status::OK();
}
@@ -987,8 +987,8 @@ Status KrpcDataStreamSender::AddRowToChannel(const int
channel_id, TupleRow* row
return channels_[channel_id]->AddRow(row);
}
-uint64_t KrpcDataStreamSender::HashRow(TupleRow* row) {
- uint64_t hash_val = exchange_hash_seed_;
+uint64_t KrpcDataStreamSender::HashRow(TupleRow* row, uint64_t seed) {
+ uint64_t hash_val = seed;
for (ScalarExprEvaluator* eval : partition_expr_evals_) {
void* partition_val = eval->GetValue(row);
// We can't use the crc hash function here because it does not result in
diff --git a/be/src/runtime/krpc-data-stream-sender.h
b/be/src/runtime/krpc-data-stream-sender.h
index 99a7dca86..21074fa66 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -173,7 +173,7 @@ class KrpcDataStreamSender : public DataSink {
/// Evaluates the input row against partition expressions and hashes the
expression
/// values. Returns the final hash value.
- uint64_t HashRow(TupleRow* row);
+ uint64_t HashRow(TupleRow* row, uint64_t seed);
/// Used when 'partition_type_' is HASH_PARTITIONED. Call HashRow() against
each row
/// in the input batch and adds it to the corresponding channel based on the
hash value.
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 25a60d10a..1aa5cb7ec 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -19,6 +19,7 @@
#include <mutex>
+#include "codegen/llvm-codegen-cache.h"
#include "codegen/llvm-codegen.h"
#include "common/thread-debug-info.h"
#include "exec/kudu/kudu-util.h"
@@ -800,6 +801,11 @@ bool QueryState::WaitForFinishOrTimeout(int32_t
timeout_ms) {
return !timed_out;
}
+bool QueryState::codegen_cache_enabled() const {
+ return !query_options().disable_codegen_cache && !disable_codegen_cache_
+ && ExecEnv::GetInstance()->codegen_cache_enabled();
+}
+
bool QueryState::StartFInstances() {
VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
<< " #instances=" << fragment_info_.fragment_instance_ctxs.size();
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 4c652a6da..6f3613f77 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -51,6 +51,7 @@ class FragmentState;
class FragmentInstanceState;
class InitialReservations;
class LlvmCodeGen;
+class CodeGenCache;
class MemTracker;
class PlanNode;
class PublishFilterParamsPB;
@@ -150,6 +151,7 @@ class QueryState {
const TQueryOptions& query_options() const {
return query_ctx_.client_request.query_options;
}
+ bool codegen_cache_enabled() const;
MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
RuntimeProfile* host_profile() const { return host_profile_; }
UniqueIdPB GetCoordinatorBackendId() const;
@@ -462,6 +464,9 @@ class QueryState {
/// send a status report so that we can cancel after a configurable timeout.
int64_t failed_report_time_ms_ = 0;
+ /// Indicator of whether to disable the codegen cache for the query.
+ bool disable_codegen_cache_ = false;
+
/// Create QueryState w/ a refcnt of 0 and a memory limit of 'mem_limit'
bytes applied
/// to the query mem tracker. The query is associated with the resource pool
set in
/// 'query_ctx.request_pool' or from 'request_pool', if the former is not
set (needed
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index 5dec32544..b68add29e 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -18,8 +18,9 @@
#ifndef IMPALA_RUNTIME_TEST_ENV
#define IMPALA_RUNTIME_TEST_ENV
-#include "runtime/io/disk-io-mgr.h"
+#include "codegen/llvm-codegen-cache.h"
#include "runtime/exec-env.h"
+#include "runtime/io/disk-io-mgr.h"
#include "runtime/runtime-state.h"
namespace impala {
@@ -76,9 +77,15 @@ class TestEnv {
/// Return total of mem tracker consumption for all queries.
int64_t TotalQueryMemoryConsumption();
+ /// Reset the codegen cache.
+ void ResetCodegenCache(MetricGroup* metrics) {
+ exec_env_->codegen_cache_.reset(new CodeGenCache(metrics));
+ }
+
ExecEnv* exec_env() { return exec_env_.get(); }
MetricGroup* metrics() { return exec_env_->metrics(); }
TmpFileMgr* tmp_file_mgr() { return exec_env_->tmp_file_mgr(); }
+ CodeGenCache* codegen_cache() { return exec_env_->codegen_cache_.get(); }
private:
/// Recreate global metric groups.
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 8582df3a4..a661096c4 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -849,6 +849,17 @@ Status impala::SetQueryOption(const string& key, const
string& value,
query_options->__set_async_codegen(IsTrue(value));
break;
}
+ case TImpalaQueryOptions::DISABLE_CODEGEN_CACHE: {
+ query_options->__set_disable_codegen_cache(IsTrue(value));
+ break;
+ }
+ case TImpalaQueryOptions::CODEGEN_CACHE_MODE: {
+ TCodeGenCacheMode::type enum_type;
+ RETURN_IF_ERROR(GetThriftEnum(
+ value, "CodeGen Cache Mode", _TCodeGenCacheMode_VALUES_TO_NAMES,
&enum_type));
+ query_options->__set_codegen_cache_mode(enum_type);
+ break;
+ }
case TImpalaQueryOptions::ENABLE_DISTINCT_SEMI_JOIN_OPTIMIZATION: {
query_options->__set_enable_distinct_semi_join_optimization(IsTrue(value));
break;
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 05dcdf2ee..4f5330435 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE
\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),
\
- TImpalaQueryOptions::FALLBACK_DB_FOR_FUNCTIONS + 1);
\
+ TImpalaQueryOptions::CODEGEN_CACHE_MODE + 1);
\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded,
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)
\
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)
\
@@ -276,8 +276,11 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
QUERY_OPT_FN(lock_max_wait_time_s, LOCK_MAX_WAIT_TIME_S,
TQueryOptionLevel::REGULAR) \
QUERY_OPT_FN(orc_schema_resolution, ORC_SCHEMA_RESOLUTION,
TQueryOptionLevel::REGULAR) \
QUERY_OPT_FN(expand_complex_types, EXPAND_COMPLEX_TYPES,
TQueryOptionLevel::REGULAR) \
- QUERY_OPT_FN(fallback_db_for_functions, FALLBACK_DB_FOR_FUNCTIONS,
\
- TQueryOptionLevel::ADVANCED);
+ QUERY_OPT_FN(
\
+ fallback_db_for_functions, FALLBACK_DB_FOR_FUNCTIONS,
TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(
\
+ disable_codegen_cache, DISABLE_CODEGEN_CACHE,
TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(codegen_cache_mode, CODEGEN_CACHE_MODE,
TQueryOptionLevel::DEVELOPMENT);
/// Enforce practical limits on some query options to avoid undesired query
state.
static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/common/thrift/ImpalaService.thrift
b/common/thrift/ImpalaService.thrift
index 0b3cef56b..e759c068e 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -742,6 +742,18 @@ enum TImpalaQueryOptions {
// Specify the database name which stores global udf
FALLBACK_DB_FOR_FUNCTIONS = 148;
+
+ // Specify whether to use codegen cache.
+ DISABLE_CODEGEN_CACHE = 149;
+
+ // Specify how the entry stores to the codegen cache, would affect the entry
size.
+ // Possible values are NORMAL, OPTIMAL, NORMAL_DEBUG and OPTIMAL_DEBUG.
+ // The normal mode will use a full key for the cache, while the optimal mode
uses
+ // a hashcode of 128 bits for the key to save the memory consumption.
+ // The debug mode of each of them allows more logs, would be helpful to
target
+ // an issue.
+ // Only valid if DISABLE_CODEGEN_CACHE is set to false.
+ CODEGEN_CACHE_MODE = 150;
}
// The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 596965ff8..4ab9d4101 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -85,6 +85,15 @@ enum TMinmaxFilterFastCodePathMode {
VERIFICATION=2
}
+// The options for CodeGen Cache.
+// The debug options allow more logs, the value equal to the mode plus 256.
+enum TCodeGenCacheMode {
+ NORMAL = 0
+ OPTIMAL = 1
+ NORMAL_DEBUG = 256
+ OPTIMAL_DEBUG = 257
+}
+
// Options for when to write Parquet Bloom filters for supported types.
enum TParquetBloomFilterWrite {
// Never write Parquet Bloom filters.
@@ -599,6 +608,10 @@ struct TQueryOptions {
148: optional bool expand_complex_types = false;
149: optional string fallback_db_for_functions;
+
+ // See comment in ImpalaService.thrift
+ 150: optional bool disable_codegen_cache = false;
+ 151: optional TCodeGenCacheMode codegen_cache_mode =
TCodeGenCacheMode.NORMAL;
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and
external
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 6e5bca495..7188d75aa 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -19,6 +19,66 @@
"kind": "COUNTER",
"key": "external-data-source.class-cache.hits"
},
+ {
+ "description": "The total number of cache misses in the CodeGen Cache",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "CodeGen Cache Misses",
+ "units": "UNIT",
+ "kind": "COUNTER",
+ "key": "impala.codegen-cache.misses"
+ },
+ {
+ "description": "The number of in-use CodeGen Cache Entries",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "In-use CodeGen Cache Entries",
+ "units": "UNIT",
+ "kind": "GAUGE",
+ "key": "impala.codegen-cache.entries-in-use"
+ },
+ {
+ "description": "The total bytes of in-use CodeGen Cache Entries",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "In-use CodeGen Cache Entries total bytes",
+ "units": "UNIT",
+ "kind": "GAUGE",
+ "key": "impala.codegen-cache.entries-in-use-bytes"
+ },
+ {
+ "description": "The number of evicted CodeGen Cache Entries",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Evicted CodeGen Cache Entries",
+ "units": "UNIT",
+ "kind": "COUNTER",
+ "key": "impala.codegen-cache.entries-evicted"
+ },
+ {
+ "description": "The total number of cache hits in the CodeGen Cache",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "CodeGen Cache Hits",
+ "units": "UNIT",
+ "kind": "COUNTER",
+ "key": "impala.codegen-cache.hits"
+ },
+ {
+ "description": "Statistics for codegen cache entry sizes allocated from
the system.",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "CodeGen Cache Entry Sizes.",
+ "units": "BYTES",
+ "kind": "HISTOGRAM",
+ "key": "impala.codegen-cache.entry-sizes"
+ },
{
"description": "Resource Pool $0 Configured Max Mem Resources",
"contexts": [
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/codegen-cache-udf.test
b/testdata/workloads/functional-query/queries/QueryTest/codegen-cache-udf.test
new file mode 100644
index 000000000..7f1bb6007
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/codegen-cache-udf.test
@@ -0,0 +1,24 @@
+====
+---- QUERY
+# Test identity functions
+select identity(true);
+---- TYPES
+boolean
+---- RESULTS
+true
+====
+---- QUERY
+select test_count(int_col) from functional.alltypestiny;
+---- RESULTS
+8
+---- TYPES
+bigint
+====
+---- QUERY
+# Test UDFs over tables
+select sum(identity(bigint_col)) from functional.alltypes
+---- TYPES
+bigint
+---- RESULTS
+328500
+====
diff --git a/tests/common/test_result_verifier.py
b/tests/common/test_result_verifier.py
index 69f22e509..790b895fc 100644
--- a/tests/common/test_result_verifier.py
+++ b/tests/common/test_result_verifier.py
@@ -755,3 +755,11 @@ def assert_codegen_enabled(profile_string, exec_node_ids):
for exec_options in get_node_exec_options(profile_string, exec_node_id):
assert 'Codegen Enabled' in exec_options
assert not 'Codegen Disabled' in exec_options
+
+
+def assert_codegen_cache_hit(profile_string, expect_hit):
+ assert "NumCachedFunctions" in profile_string
+ if expect_hit:
+ assert "NumCachedFunctions: 0 " not in profile_string
+ else:
+ assert "NumCachedFunctions: 0 " in profile_string
diff --git a/tests/custom_cluster/test_codegen_cache.py
b/tests/custom_cluster/test_codegen_cache.py
new file mode 100644
index 000000000..f867fb378
--- /dev/null
+++ b/tests/custom_cluster/test_codegen_cache.py
@@ -0,0 +1,254 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from copy import copy
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster
+from tests.common.test_result_verifier import assert_codegen_cache_hit
+from tests.util.filesystem_utils import get_fs_path
+
+
[email protected]_hdfs
[email protected]
+class TestCodegenCache(CustomClusterTestSuite):
+ """ This test enables the codegen cache and verfies that cache hit and miss
counts
+ in the runtime profile and metrics are as expected.
+ """
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestCodegenCache, cls).setup_class()
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache(self, vector):
+ self._test_codegen_cache(vector,
+ ("select * from (select * from functional.alltypes "
+ + "limit 1000000) t1 where int_col > 10 limit 10"))
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_int_col(self, vector):
+ self._test_codegen_cache(vector,
+ "select * from functional.alltypes where int_col > 0")
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_tinyint_col(self, vector):
+ self._test_codegen_cache(vector,
+ "select * from functional.alltypes where tinyint_col > 0")
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_bool_col(self, vector):
+ self._test_codegen_cache(vector,
+ "select * from functional.alltypes where bool_col > 0")
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_bigint_col(self, vector):
+ self._test_codegen_cache(vector,
+ "select * from functional.alltypes where bigint_col > 0")
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_float_col(self, vector):
+ self._test_codegen_cache(vector,
+ "select * from functional.alltypes where float_col > 0")
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_double_col(self, vector):
+ self._test_codegen_cache(vector,
+ "select * from functional.alltypes where double_col > 0")
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_date_string_col(self, vector):
+ self._test_codegen_cache(vector,
+ "select * from functional.alltypes where date_string_col != ''")
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_string_col(self, vector):
+ self._test_codegen_cache(vector,
+ "select * from functional.alltypes where string_col != ''")
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_poly_func_string_col(self, vector):
+ self._test_codegen_cache(vector,
+ ("select * from functional.alltypes where "
+ + "CHAR_LENGTH(string_col) > 0"))
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_poly_func_date_string_col(self, vector):
+ self._test_codegen_cache(vector,
+ ("select * from functional.alltypes where "
+ + "CHAR_LENGTH(date_string_col) > 0"))
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ # Test native uda is missed in the codegen cache, as it is disabled.
+ def test_codegen_cache_uda_miss(self, vector):
+ database = "test_codegen_cache_uda_miss"
+ self._load_functions(database)
+ self._test_codegen_cache(vector,
+ "select test_count(int_col) from functional.alltypestiny", False)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ # Test native udf is missed in the codegen cache, as it is disabled.
+ def test_codegen_cache_udf_miss(self, vector):
+ database = "test_codegen_cache_udf_miss"
+ self._load_functions(database)
+ self._test_codegen_cache(vector,
+ "select sum(identity(bigint_col)) from functional.alltypes", False)
+
+ def _check_metric_expect_init(self):
+ # Verifies that the cache metrics are all zero.
+ assert self.get_metric('impala.codegen-cache.entries-evicted') == 0
+ assert self.get_metric('impala.codegen-cache.entries-in-use') == 0
+ assert self.get_metric('impala.codegen-cache.entries-in-use-bytes') == 0
+ assert self.get_metric('impala.codegen-cache.hits') == 0
+ assert self.get_metric('impala.codegen-cache.misses') == 0
+
+ def _test_codegen_cache(self, vector, sql, expect_hit=True,
expect_num_frag=2):
+ # Do not disable codegen.
+ exec_options = copy(vector.get_value('exec_option'))
+ exec_options['exec_single_node_rows_threshold'] = 0
+ self._check_metric_expect_init()
+ result = self.execute_query(sql, exec_options)
+ assert_codegen_cache_hit(result.runtime_profile, False)
+ # expect_num_cache_miss_fragment is 1 iff expect_hit is False, and expect
only
+ # one fragment codegen cache missing for the case if expect_hit is False.
+ expect_num_cache_miss_fragment = 1
+ if expect_hit:
+ expect_num_cache_miss_fragment = 0
+ expect_num_cache_hit = expect_num_frag - expect_num_cache_miss_fragment
+
+ # Verifies that the cache misses > 0, because the look up fails in an empty
+ # brandnew cache, then a new entry should be stored successfully, so the
in-use
+ # entry number and bytes should be larger than 0.
+ assert self.get_metric('impala.codegen-cache.entries-evicted') == 0
+ assert self.get_metric('impala.codegen-cache.entries-in-use') ==
expect_num_cache_hit
+ assert self.get_metric('impala.codegen-cache.entries-in-use-bytes') > 0
+ assert self.get_metric('impala.codegen-cache.hits') == 0
+ assert self.get_metric('impala.codegen-cache.misses') ==
expect_num_cache_hit
+
+ result = self.execute_query(sql, exec_options)
+ # Verify again, the expected cache hit should be reflected.
+ if expect_hit:
+ assert_codegen_cache_hit(result.runtime_profile, True)
+ else:
+ assert_codegen_cache_hit(result.runtime_profile, False)
+ assert self.get_metric('impala.codegen-cache.entries-evicted') == 0
+ assert self.get_metric('impala.codegen-cache.entries-in-use') ==
expect_num_cache_hit
+ assert self.get_metric('impala.codegen-cache.entries-in-use-bytes') > 0
+ assert self.get_metric('impala.codegen-cache.hits') == expect_num_cache_hit
+ assert self.get_metric('impala.codegen-cache.misses') ==
expect_num_cache_hit
+
+ def _load_functions(self, database):
+ create_func_template = """
+ use default;
+ drop database if exists {database} CASCADE;
+ create database {database};
+ create aggregate function {database}.test_count(int) returns bigint
+ location '{location_uda}' update_fn='CountUpdate';
+ create function {database}.identity(boolean) returns boolean
+ location '{location_udf}' symbol='Identity';
+ create function {database}.identity(bigint) returns bigint
+ location '{location_udf}' symbol='Identity';
+ use {database};
+ """
+ location_uda = get_fs_path('/test-warehouse/libudasample.so')
+ location_udf = get_fs_path('/test-warehouse/libTestUdfs.so')
+ queries = create_func_template.format(database=database,
+ location_uda=location_uda, location_udf=location_udf)
+ queries = [q for q in queries.split(';') if q.strip()]
+ for query in queries:
+ if query.strip() == '': continue
+ result = self.execute_query_expect_success(self.client, query)
+ assert result is not None
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=3)
+ def test_codegen_cache_udf_crash(self, vector):
+ # The testcase would crash if we don't disable the native udf for
codegen cache.
+ database = "test_codegen_cache_udf_crash"
+ self._load_functions(database)
+ self.run_test_case('QueryTest/codegen-cache-udf', vector,
use_db=database)
+ # Even the udf is disabled and the queries are using the udf, there
could be
+ # other fragments stored to the codegen cache, so we check whether the
codegen
+ # cache is enabled to other cases.
+ assert self.get_metric('impala.codegen-cache.entries-in-use') > 0
+ assert self.get_metric('impala.codegen-cache.entries-in-use-bytes') > 0
+
+ # Run multiple times, recreate the udfs, would crash if the udf is
reused from
+ # the codegen cache.
+ for i in range(3):
+ # Make the database different
+ database = database + "diff"
+ self._load_functions(database)
+ self.run_test_case('QueryTest/codegen-cache-udf', vector,
use_db=database)
+
+ def _test_codegen_cache_timezone_crash_helper(self, database):
+ create_db_template = """
+ use default;
+ drop database if exists {database} CASCADE;
+ create database {database};
+ create table {database}.alltimezones as select * from
functional.alltimezones;
+ use {database};
+ """
+ queries = create_db_template.format(database=database)
+ queries = [q for q in queries.split(';') if q.strip()]
+ query = "select timezone, utctime, localtime,\
+ from_utc_timestamp(utctime,timezone) as\
+ impalaresult from alltimezones where\
+ localtime != from_utc_timestamp(utctime,timezone)"
+ queries.append(query)
+ for query in queries:
+ if query.strip() == '': continue
+ result = self.execute_query_expect_success(self.client, query)
+ assert result is not None
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ def test_codegen_cache_timezone_crash(self, vector):
+ # The testcase tests whether it would crash using the broken builtin
function
+ # from_utc_timestamp from the codegen cache.
+ database = "test_codegen_cache_timezone_crash"
+ # Run multiple times, recreate the database each time. Except for the
first run,
+ # other runs should all hit the cache.
+ # Expect won't crash.
+ for i in range(5):
+ # Make the database different
+ self._test_codegen_cache_timezone_crash_helper(database + str(i))
+ # During the table creation, there will be one fragment involved, for
the
+ # query we are going to test, will be two fragments, so totally three
+ # fragments involved, should all be cached.
+ assert self.get_metric('impala.codegen-cache.entries-in-use') == 3
+ assert self.get_metric('impala.codegen-cache.entries-in-use-bytes') > 0
+ assert self.get_metric('impala.codegen-cache.hits') == i * 3
+ assert self.get_metric('impala.codegen-cache.misses') == 3