IMPALA-2746: part 1: enable LSAN for many backend tests This turns on leak sanitizer for backend tests that required relatively small modifications to pass. We suppress a few leaks, mainly related to the embedded JVM.
Testing: Ran core tests under ASAN. Change-Id: Ibdda092a4eb4bc827c75a8c121e5428ec746b7f4 Reviewed-on: http://gerrit.cloudera.org:8080/10668 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/5c3f06c7 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/5c3f06c7 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/5c3f06c7 Branch: refs/heads/master Commit: 5c3f06c7232fac2d72c100e8f28582459760e320 Parents: cfe2504 Author: Tim Armstrong <[email protected]> Authored: Fri Jun 8 14:54:45 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Jun 19 00:23:49 2018 +0000 ---------------------------------------------------------------------- be/CMakeLists.txt | 11 +- be/src/catalog/CMakeLists.txt | 2 +- be/src/codegen/CMakeLists.txt | 4 +- be/src/codegen/instruction-counter-test.cc | 54 +++++----- be/src/common/CMakeLists.txt | 4 +- be/src/common/atomic-test.cc | 8 +- be/src/common/atomic.h | 11 ++ be/src/exec/CMakeLists.txt | 20 ++-- be/src/experiments/CMakeLists.txt | 2 +- be/src/exprs/CMakeLists.txt | 4 +- be/src/exprs/expr-test.cc | 93 +++++++++-------- be/src/rpc/CMakeLists.txt | 12 +-- be/src/runtime/CMakeLists.txt | 38 +++---- be/src/runtime/bufferpool/CMakeLists.txt | 10 +- be/src/runtime/bufferpool/buffer-pool-test.cc | 4 +- be/src/runtime/data-stream-test.cc | 111 ++++++++++----------- be/src/runtime/io/CMakeLists.txt | 2 +- be/src/runtime/io/disk-io-mgr-stress.cc | 4 +- be/src/runtime/io/disk-io-mgr-stress.h | 3 +- be/src/runtime/io/disk-io-mgr-test.cc | 6 +- be/src/runtime/mem-tracker.cc | 3 +- be/src/scheduling/CMakeLists.txt | 6 +- be/src/service/CMakeLists.txt | 8 +- be/src/statestore/CMakeLists.txt | 2 +- be/src/udf/udf.cc | 2 + be/src/util/CMakeLists.txt | 72 ++++++------- be/src/util/decompress-test.cc | 9 +- bin/lsan-suppressions.txt | 34 +++++++ 28 files changed, 297 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 1eed615..d817009 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -510,12 +510,18 @@ FUNCTION(ADD_BE_TEST TEST_NAME) ADD_DEPENDENCIES(be-test ${TEST_NAME}) ENDFUNCTION() +FUNCTION(ENABLE_LSAN_FOR_TEST TEST_NAME) + SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES ENVIRONMENT + "ASAN_OPTIONS=handle_segv=0 detect_leaks=1 allocator_may_return_null=1") + SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES ENVIRONMENT + "LSAN_OPTIONS=suppressions=${CMAKE_SOURCE_DIR}/bin/lsan-suppressions.txt") +ENDFUNCTION() + # Same as ADD_BE_TEST, but also enable LeakSanitizer. # TODO: IMPALA-2746: we should make this the default. FUNCTION(ADD_BE_LSAN_TEST TEST_NAME) ADD_BE_TEST(${TEST_NAME}) - SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES ENVIRONMENT - "ASAN_OPTIONS=handle_segv=0 detect_leaks=1 allocator_may_return_null=1") + ENABLE_LSAN_FOR_TEST(${TEST_NAME}) ENDFUNCTION() # Similar utility function for tests that use the UDF SDK @@ -532,6 +538,7 @@ FUNCTION(ADD_UDF_TEST TEST_NAME) ADD_TEST(${TEST_NAME} "${BUILD_OUTPUT_ROOT_DIRECTORY}/${DIR_NAME}/${TEST_NAME}" -log_dir=$ENV{IMPALA_BE_TEST_LOGS_DIR}) ADD_DEPENDENCIES(be-test ${TEST_NAME}) + ENABLE_LSAN_FOR_TEST(${TEST_NAME}) ENDFUNCTION() # Function to generate rule to cross compile a source file to an IR module. http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/catalog/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/catalog/CMakeLists.txt b/be/src/catalog/CMakeLists.txt index 35cccea..ba8d97c 100644 --- a/be/src/catalog/CMakeLists.txt +++ b/be/src/catalog/CMakeLists.txt @@ -26,4 +26,4 @@ add_library(Catalog ) add_dependencies(Catalog gen-deps) -ADD_BE_TEST(catalog-util-test) +ADD_BE_LSAN_TEST(catalog-util-test) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/codegen/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/codegen/CMakeLists.txt b/be/src/codegen/CMakeLists.txt index 56228a2..2709fcb 100644 --- a/be/src/codegen/CMakeLists.txt +++ b/be/src/codegen/CMakeLists.txt @@ -107,6 +107,6 @@ add_custom_target(test-loop.bc SOURCES ${CMAKE_SOURCE_DIR}/testdata/llvm/test-loop.cc ) -ADD_BE_TEST(llvm-codegen-test) +ADD_BE_LSAN_TEST(llvm-codegen-test) add_dependencies(llvm-codegen-test test-loop.bc) -ADD_BE_TEST(instruction-counter-test) +ADD_BE_LSAN_TEST(instruction-counter-test) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/codegen/instruction-counter-test.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/instruction-counter-test.cc b/be/src/codegen/instruction-counter-test.cc index f404243..5dbc605 100644 --- a/be/src/codegen/instruction-counter-test.cc +++ b/be/src/codegen/instruction-counter-test.cc @@ -73,19 +73,19 @@ llvm::Module* CodegenMulAdd(llvm::LLVMContext* context) { TEST_F(InstructionCounterTest, Count) { llvm::Module* MulAddModule = CodegenMulAdd(&context_); - InstructionCounter* instruction_counter = new InstructionCounter(); - instruction_counter->visit(*MulAddModule); - instruction_counter->PrintCounters(); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 3); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TERMINATOR_INSTS), 1); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0); + InstructionCounter instruction_counter; + instruction_counter.visit(*MulAddModule); + instruction_counter.PrintCounters(); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 3); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TERMINATOR_INSTS), 1); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0); // Test Reset - instruction_counter->ResetCount(); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 0); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0); + instruction_counter.ResetCount(); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 0); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0); } // IR output from CodegenGcd @@ -152,25 +152,25 @@ llvm::Module* CodegenGcd(llvm::LLVMContext* context) { TEST_F(InstructionCounterTest, TestMemInstrCount) { llvm::Module* GcdModule = CodegenGcd(&context_); - InstructionCounter* instruction_counter = new InstructionCounter(); - instruction_counter->visit(*GcdModule); - std::cout << instruction_counter->PrintCounters(); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_BLOCKS), 5); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 11); + InstructionCounter instruction_counter; + instruction_counter.visit(*GcdModule); + std::cout << instruction_counter.PrintCounters(); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_BLOCKS), 5); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 11); // Test Category Totals - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TERMINATOR_INSTS), 5); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::OTHER_INSTS), 4); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TERMINATOR_INSTS), 5); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::OTHER_INSTS), 4); // Test Reset - instruction_counter->ResetCount(); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_BLOCKS), 0); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 0); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TERMINATOR_INSTS), 0); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0); - EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::OTHER_INSTS), 0); + instruction_counter.ResetCount(); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_BLOCKS), 0); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 0); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TERMINATOR_INSTS), 0); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0); + EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::OTHER_INSTS), 0); } } // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/common/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/common/CMakeLists.txt b/be/src/common/CMakeLists.txt index c01d71c..daa274d 100644 --- a/be/src/common/CMakeLists.txt +++ b/be/src/common/CMakeLists.txt @@ -49,8 +49,8 @@ add_library(GlobalFlags ) add_dependencies(GlobalFlags gen-deps) -ADD_BE_TEST(atomic-test) -ADD_BE_TEST(thread-debug-info-test) +ADD_BE_LSAN_TEST(atomic-test) +ADD_BE_LSAN_TEST(thread-debug-info-test) # Generate config.h from config.h.in, filling in variables from CMake CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.in http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/common/atomic-test.cc ---------------------------------------------------------------------- diff --git a/be/src/common/atomic-test.cc b/be/src/common/atomic-test.cc index ccbaa8e..dc35396 100644 --- a/be/src/common/atomic-test.cc +++ b/be/src/common/atomic-test.cc @@ -236,10 +236,10 @@ static void TestAcquireReleaseLoadStore() { const int ITERS = 1000000; AtomicInt<T> control(-1); T payload = -1; - thread* t_a = new thread(AcquireReleaseThreadA<T>, 0, 1, ITERS, &control, &payload); - thread* t_b = new thread(AcquireReleaseThreadB<T>, 1, 0, ITERS, &control, &payload); - t_a->join(); - t_b->join(); + thread t_a(AcquireReleaseThreadA<T>, 0, 1, ITERS, &control, &payload); + thread t_b(AcquireReleaseThreadB<T>, 1, 0, ITERS, &control, &payload); + t_a.join(); + t_b.join(); } TEST(AtomicTest, MultipleTreadsAcquireReleaseLoadStoreInt) { http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/common/atomic.h ---------------------------------------------------------------------- diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h index 0d3e556..4c72826 100644 --- a/be/src/common/atomic.h +++ b/be/src/common/atomic.h @@ -105,6 +105,12 @@ class AtomicInt { return base::subtle::Barrier_CompareAndSwap(&value_, old_val, new_val) == old_val; } + /// Store 'new_val' and return the previous value. Implies a Release memory barrier + /// (i.e. the same as Store()). + ALWAYS_INLINE T Swap(T new_val) { + return base::subtle::Release_AtomicExchange(&value_, new_val); + } + private: T value_; @@ -130,6 +136,11 @@ class AtomicPtr { /// Atomic store with "release" memory-ordering semantic. ALWAYS_INLINE void Store(T* val) { ptr_.Store(reinterpret_cast<intptr_t>(val)); } + /// Store 'new_val' and return the previous value. Implies a Release memory barrier + /// (i.e. the same as Store()). + ALWAYS_INLINE T* Swap(T* val) { + return reinterpret_cast<T*>(ptr_.Swap(reinterpret_cast<intptr_t>(val))); + } private: internal::AtomicInt<intptr_t> ptr_; }; http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/exec/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 2349df4..77c6e15 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -102,13 +102,13 @@ add_library(Exec add_dependencies(Exec gen-deps) -ADD_BE_TEST(zigzag-test) -ADD_BE_TEST(hash-table-test) -ADD_BE_TEST(delimited-text-parser-test) -ADD_BE_TEST(read-write-util-test) -ADD_BE_TEST(parquet-plain-test) -ADD_BE_TEST(parquet-version-test) -ADD_BE_TEST(row-batch-list-test) -ADD_BE_TEST(incr-stats-util-test) -ADD_BE_TEST(hdfs-avro-scanner-test) -ADD_BE_TEST(hdfs-parquet-scanner-test) +ADD_BE_LSAN_TEST(zigzag-test) +ADD_BE_LSAN_TEST(hash-table-test) +ADD_BE_LSAN_TEST(delimited-text-parser-test) +ADD_BE_LSAN_TEST(read-write-util-test) +ADD_BE_LSAN_TEST(parquet-plain-test) +ADD_BE_LSAN_TEST(parquet-version-test) +ADD_BE_LSAN_TEST(row-batch-list-test) +ADD_BE_LSAN_TEST(incr-stats-util-test) +ADD_BE_LSAN_TEST(hdfs-avro-scanner-test) +ADD_BE_LSAN_TEST(hdfs-parquet-scanner-test) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/experiments/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/experiments/CMakeLists.txt b/be/src/experiments/CMakeLists.txt index b4f0c49..c4b5593 100644 --- a/be/src/experiments/CMakeLists.txt +++ b/be/src/experiments/CMakeLists.txt @@ -38,4 +38,4 @@ target_link_libraries(tuple-splitter-test Experiments ${IMPALA_LINK_LIBS}) target_link_libraries(hash-partition-test ${IMPALA_LINK_LIBS}) target_link_libraries(compression-test ${IMPALA_LINK_LIBS}) -ADD_BE_TEST(string-search-sse-test) +ADD_BE_LSAN_TEST(string-search-sse-test) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/exprs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index 755c166..14c65f5 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -66,8 +66,8 @@ add_library(Exprs ) add_dependencies(Exprs gen-deps gen_ir_descriptions) -ADD_BE_TEST(expr-test) -ADD_BE_TEST(expr-codegen-test) +ADD_BE_LSAN_TEST(expr-test) +ADD_BE_LSAN_TEST(expr-codegen-test) # expr-codegen-test includes test IR functions COMPILE_TO_IR(expr-codegen-test.cc) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/exprs/expr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc index 61cfceb..3d4daa7 100644 --- a/be/src/exprs/expr-test.cc +++ b/be/src/exprs/expr-test.cc @@ -168,6 +168,9 @@ class ScopedLocalUnixTimestampConversionOverride { class ExprTest : public testing::Test { protected: + // Pool for objects to be destroyed during test teardown. + ObjectPool pool_; + // Maps from enum value of primitive integer type to the minimum value that is // outside of the next smaller-resolution type. For example the value for type // TYPE_SMALLINT is numeric_limits<int8_t>::max()+1. There is a GREATEST test in @@ -238,6 +241,8 @@ class ExprTest : public testing::Test { default_type_strs_[TYPE_DECIMAL] = default_decimal_str_; } + virtual void TearDown() { pool_.Clear(); } + string GetValue(const string& expr, const ColumnType& expr_type, bool expect_error = false) { string stmt = "select " + expr; @@ -1023,7 +1028,7 @@ class ExprTest : public testing::Test { template<class T> bool ParseString(const string& str, T* val); - // Create a Literal expression out of 'str'. + // Create a Literal expression out of 'str'. Adds the returned literal to pool_. Literal* CreateLiteral(const ColumnType& type, const string& str); // Helper function for LiteralConstruction test. Creates a Literal expression @@ -1051,6 +1056,15 @@ class ExprTest : public testing::Test { TestIsNull("cast(" + stmt + " as timestamp)", TYPE_TIMESTAMP); } } + + // Wrapper around UdfTestHarness::CreateTestContext() that stores the context in + // 'pool_' to be automatically cleaned up. + FunctionContext* CreateUdfTestContext(const FunctionContext::TypeDesc& return_type, + const std::vector<FunctionContext::TypeDesc>& arg_types, + RuntimeState* state = nullptr, MemPool* pool = nullptr) { + return pool_.Add( + UdfTestHarness::CreateTestContext(return_type, arg_types, state, pool)); + } }; template<> @@ -1182,51 +1196,51 @@ Literal* ExprTest::CreateLiteral(const ColumnType& type, const string& str) { case TYPE_BOOLEAN: { bool v = false; EXPECT_TRUE(ParseString<bool>(str, &v)); - return new Literal(type, v); + return pool_.Add(new Literal(type, v)); } case TYPE_TINYINT: { int8_t v = 0; EXPECT_TRUE(ParseString<int8_t>(str, &v)); - return new Literal(type, v); + return pool_.Add(new Literal(type, v)); } case TYPE_SMALLINT: { int16_t v = 0; EXPECT_TRUE(ParseString<int16_t>(str, &v)); - return new Literal(type, v); + return pool_.Add(new Literal(type, v)); } case TYPE_INT: { int32_t v = 0; EXPECT_TRUE(ParseString<int32_t>(str, &v)); - return new Literal(type, v); + return pool_.Add(new Literal(type, v)); } case TYPE_BIGINT: { int64_t v = 0; EXPECT_TRUE(ParseString<int64_t>(str, &v)); - return new Literal(type, v); + return pool_.Add(new Literal(type, v)); } case TYPE_FLOAT: { float v = 0; EXPECT_TRUE(ParseString<float>(str, &v)); - return new Literal(type, v); + return pool_.Add(new Literal(type, v)); } case TYPE_DOUBLE: { double v = 0; EXPECT_TRUE(ParseString<double>(str, &v)); - return new Literal(type, v); + return pool_.Add(new Literal(type, v)); } case TYPE_STRING: case TYPE_VARCHAR: case TYPE_CHAR: - return new Literal(type, str); + return pool_.Add(new Literal(type, str)); case TYPE_TIMESTAMP: { TimestampValue v; EXPECT_TRUE(ParseString<TimestampValue>(str, &v)); - return new Literal(type, v); + return pool_.Add(new Literal(type, v)); } case TYPE_DECIMAL: { double v = 0; EXPECT_TRUE(ParseString<double>(str, &v)); - return new Literal(type, v); + return pool_.Add(new Literal(type, v)); } default: DCHECK(false) << "Invalid type: " << type.DebugString(); @@ -1238,6 +1252,7 @@ template <typename T> void ExprTest::TestSingleLiteralConstruction( const ColumnType& type, const T& value, const string& string_val) { ObjectPool pool; + RuntimeState state(TQueryCtx(), ExecEnv::GetInstance()); MemTracker tracker; MemPool mem_pool(&tracker); @@ -3826,7 +3841,7 @@ TEST_F(ExprTest, StringFunctions) { FunctionContext::TypeDesc str_desc; str_desc.type = FunctionContext::Type::TYPE_STRING; std::vector<FunctionContext::TypeDesc> v(3, str_desc); - auto context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool); + FunctionContext* context = CreateUdfTestContext(str_desc, v, nullptr, &pool); StringVal giga(static_cast<uint8_t*>(giga_buf->data()), StringVal::MAX_LENGTH); StringVal a("A"); @@ -3863,7 +3878,7 @@ TEST_F(ExprTest, StringFunctions) { EXPECT_TRUE(r4.is_null); // Re-create context to clear the error from failed allocation. UdfTestHarness::CloseContext(context); - context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool); + context = CreateUdfTestContext(str_desc, v, nullptr, &pool); // Similar test for second overflow. This tests overflowing on re-allocation. (*short_buf)[4095] = 'Z'; @@ -3872,7 +3887,7 @@ TEST_F(ExprTest, StringFunctions) { EXPECT_TRUE(r5.is_null); // Re-create context to clear the error from failed allocation. UdfTestHarness::CloseContext(context); - context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool); + context = CreateUdfTestContext(str_desc, v, nullptr, &pool); // Finally, test expanding to exactly MAX_LENGTH // There are 4 Zs in giga4 (not including the trailing one, as we truncate that) @@ -7370,8 +7385,6 @@ void ValidateLayout(const vector<ScalarExpr*>& exprs, int expected_byte_size, } TEST_F(ExprTest, ResultsLayoutTest) { - ObjectPool pool; - vector<ScalarExpr*> exprs; map<int, set<int>> expected_offsets; @@ -7412,9 +7425,9 @@ TEST_F(ExprTest, ResultsLayoutTest) { // With one expr, all offsets should be 0. expected_offsets[t.GetByteSize()] = set<int>({0}); if (t.type != TYPE_TIMESTAMP) { - exprs.push_back(pool.Add(CreateLiteral(t, "0"))); + exprs.push_back(CreateLiteral(t, "0")); } else { - exprs.push_back(pool.Add(CreateLiteral(t, "2016-11-09"))); + exprs.push_back(CreateLiteral(t, "2016-11-09")); } if (t.IsVarLenStringType()) { ValidateLayout(exprs, 16, 0, expected_offsets); @@ -7430,28 +7443,27 @@ TEST_F(ExprTest, ResultsLayoutTest) { // Test layout adding a bunch of exprs. This is designed to trigger padding. // The expected result is computed along the way - exprs.push_back(pool.Add(CreateLiteral(TYPE_BOOLEAN, "0"))); - exprs.push_back(pool.Add(CreateLiteral(TYPE_TINYINT, "0"))); - exprs.push_back(pool.Add(CreateLiteral(ColumnType::CreateCharType(1), "0"))); + exprs.push_back(CreateLiteral(TYPE_BOOLEAN, "0")); + exprs.push_back(CreateLiteral(TYPE_TINYINT, "0")); + exprs.push_back(CreateLiteral(ColumnType::CreateCharType(1), "0")); expected_offsets[1].insert(expected_byte_size); expected_offsets[1].insert(expected_byte_size + 1); expected_offsets[1].insert(expected_byte_size + 2); expected_byte_size += 3 * 1 + 1; // 1 byte of padding - exprs.push_back(pool.Add(CreateLiteral(TYPE_SMALLINT, "0"))); + exprs.push_back(CreateLiteral(TYPE_SMALLINT, "0")); expected_offsets[2].insert(expected_byte_size); expected_byte_size += 2; // No padding before CHAR - exprs.push_back(pool.Add(CreateLiteral(ColumnType::CreateCharType(3), "0"))); + exprs.push_back(CreateLiteral(ColumnType::CreateCharType(3), "0")); expected_offsets[3].insert(expected_byte_size); expected_byte_size += 3 + 3; // 3 byte of padding ASSERT_EQ(expected_byte_size % 4, 0); - exprs.push_back(pool.Add(CreateLiteral(TYPE_INT, "0"))); - exprs.push_back(pool.Add(CreateLiteral(TYPE_FLOAT, "0"))); - exprs.push_back(pool.Add(CreateLiteral(TYPE_FLOAT, "0"))); - exprs.push_back(pool.Add( - CreateLiteral(ColumnType::CreateDecimalType(9, 0), "0"))); + exprs.push_back(CreateLiteral(TYPE_INT, "0")); + exprs.push_back(CreateLiteral(TYPE_FLOAT, "0")); + exprs.push_back(CreateLiteral(TYPE_FLOAT, "0")); + exprs.push_back(CreateLiteral(ColumnType::CreateDecimalType(9, 0), "0")); expected_offsets[4].insert(expected_byte_size); expected_offsets[4].insert(expected_byte_size + 4); expected_offsets[4].insert(expected_byte_size + 8); @@ -7459,12 +7471,11 @@ TEST_F(ExprTest, ResultsLayoutTest) { expected_byte_size += 4 * 4 + 4; // 4 bytes of padding ASSERT_EQ(expected_byte_size % 8, 0); - exprs.push_back(pool.Add(CreateLiteral(TYPE_BIGINT, "0"))); - exprs.push_back(pool.Add(CreateLiteral(TYPE_BIGINT, "0"))); - exprs.push_back(pool.Add(CreateLiteral(TYPE_BIGINT, "0"))); - exprs.push_back(pool.Add(CreateLiteral(TYPE_DOUBLE, "0"))); - exprs.push_back(pool.Add( - CreateLiteral(ColumnType::CreateDecimalType(18, 0), "0"))); + exprs.push_back(CreateLiteral(TYPE_BIGINT, "0")); + exprs.push_back(CreateLiteral(TYPE_BIGINT, "0")); + exprs.push_back(CreateLiteral(TYPE_BIGINT, "0")); + exprs.push_back(CreateLiteral(TYPE_DOUBLE, "0")); + exprs.push_back(CreateLiteral(ColumnType::CreateDecimalType(18, 0), "0")); expected_offsets[8].insert(expected_byte_size); expected_offsets[8].insert(expected_byte_size + 8); expected_offsets[8].insert(expected_byte_size + 16); @@ -7473,20 +7484,18 @@ TEST_F(ExprTest, ResultsLayoutTest) { expected_byte_size += 5 * 8; // No more padding ASSERT_EQ(expected_byte_size % 8, 0); - exprs.push_back(pool.Add(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09"))); - exprs.push_back(pool.Add(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09"))); - exprs.push_back(pool.Add( - CreateLiteral(ColumnType::CreateDecimalType(20, 0), "0"))); + exprs.push_back(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09")); + exprs.push_back(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09")); + exprs.push_back(CreateLiteral(ColumnType::CreateDecimalType(20, 0), "0")); expected_offsets[16].insert(expected_byte_size); expected_offsets[16].insert(expected_byte_size + 16); expected_offsets[16].insert(expected_byte_size + 32); expected_byte_size += 3 * 16; ASSERT_EQ(expected_byte_size % 8, 0); - exprs.push_back(pool.Add(CreateLiteral(TYPE_STRING, "0"))); - exprs.push_back(pool.Add(CreateLiteral(TYPE_STRING, "0"))); - exprs.push_back(pool.Add( - CreateLiteral(ColumnType::CreateVarcharType(1), "0"))); + exprs.push_back(CreateLiteral(TYPE_STRING, "0")); + exprs.push_back(CreateLiteral(TYPE_STRING, "0")); + exprs.push_back(CreateLiteral(ColumnType::CreateVarcharType(1), "0")); expected_offsets[0].insert(expected_byte_size); expected_offsets[0].insert(expected_byte_size + 16); expected_offsets[0].insert(expected_byte_size + 32); http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt index 1c7adc6..56a4190 100644 --- a/be/src/rpc/CMakeLists.txt +++ b/be/src/rpc/CMakeLists.txt @@ -39,20 +39,20 @@ add_library(Rpc ) add_dependencies(Rpc gen-deps) -ADD_BE_TEST(thrift-util-test) -ADD_BE_TEST(thrift-server-test) -# The thrift-server-test uses some utilites from the Kudu security test code. +ADD_BE_LSAN_TEST(thrift-util-test) +ADD_BE_TEST(thrift-server-test) # TODO: this test leaks servers +# The thrift-server-test uses some utilities from the Kudu security test code. target_link_libraries(thrift-server-test security-test-for-impala) -ADD_BE_TEST(authentication-test) +ADD_BE_LSAN_TEST(authentication-test) -ADD_BE_TEST(rpc-mgr-test) +ADD_BE_TEST(rpc-mgr-test) # TODO: this test leaks various KRPC things add_dependencies(rpc-mgr-test rpc_test_proto) target_link_libraries(rpc-mgr-test rpc_test_proto) target_link_libraries(rpc-mgr-test security-test-for-impala) target_link_libraries(rpc-mgr-test ${KRB5_REALM_OVERRIDE}) -ADD_BE_TEST(rpc-mgr-kerberized-test) +ADD_BE_TEST(rpc-mgr-kerberized-test) # TODO: this test leaks various KRPC things add_dependencies(rpc-mgr-kerberized-test rpc_test_proto) target_link_libraries(rpc-mgr-kerberized-test rpc_test_proto) target_link_libraries(rpc-mgr-kerberized-test security-test-for-impala) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 7dcf45c..a55a394 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -77,22 +77,22 @@ add_library(Runtime ) add_dependencies(Runtime gen-deps) -ADD_BE_TEST(mem-pool-test) -ADD_BE_TEST(free-pool-test) -ADD_BE_TEST(string-buffer-test) -ADD_BE_TEST(data-stream-test) -ADD_BE_TEST(timestamp-test) -ADD_BE_TEST(raw-value-test) -ADD_BE_TEST(string-compare-test) -ADD_BE_TEST(string-search-test) -ADD_BE_TEST(string-value-test) -ADD_BE_TEST(thread-resource-mgr-test) -ADD_BE_TEST(mem-tracker-test) -ADD_BE_TEST(multi-precision-test) -ADD_BE_TEST(decimal-test) -ADD_BE_TEST(buffered-tuple-stream-test) -ADD_BE_TEST(hdfs-fs-cache-test) -ADD_BE_TEST(tmp-file-mgr-test) -ADD_BE_TEST(row-batch-serialize-test) -ADD_BE_TEST(row-batch-test) -ADD_BE_TEST(collection-value-builder-test) +ADD_BE_LSAN_TEST(mem-pool-test) +ADD_BE_LSAN_TEST(free-pool-test) +ADD_BE_LSAN_TEST(string-buffer-test) +ADD_BE_TEST(data-stream-test) # TODO: this test leaks +ADD_BE_LSAN_TEST(timestamp-test) +ADD_BE_LSAN_TEST(raw-value-test) +ADD_BE_LSAN_TEST(string-compare-test) +ADD_BE_LSAN_TEST(string-search-test) +ADD_BE_LSAN_TEST(string-value-test) +ADD_BE_LSAN_TEST(thread-resource-mgr-test) +ADD_BE_LSAN_TEST(mem-tracker-test) +ADD_BE_LSAN_TEST(multi-precision-test) +ADD_BE_LSAN_TEST(decimal-test) +ADD_BE_LSAN_TEST(buffered-tuple-stream-test) +ADD_BE_LSAN_TEST(hdfs-fs-cache-test) +ADD_BE_LSAN_TEST(tmp-file-mgr-test) +ADD_BE_LSAN_TEST(row-batch-serialize-test) +ADD_BE_LSAN_TEST(row-batch-test) +ADD_BE_LSAN_TEST(collection-value-builder-test) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/runtime/bufferpool/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/CMakeLists.txt b/be/src/runtime/bufferpool/CMakeLists.txt index ce68b07..d5c95e9 100644 --- a/be/src/runtime/bufferpool/CMakeLists.txt +++ b/be/src/runtime/bufferpool/CMakeLists.txt @@ -31,8 +31,8 @@ add_library(BufferPool ) add_dependencies(BufferPool gen-deps) -ADD_BE_TEST(buffer-allocator-test) -ADD_BE_TEST(buffer-pool-test) -ADD_BE_TEST(free-list-test) -ADD_BE_TEST(reservation-tracker-test) -ADD_BE_TEST(suballocator-test) +ADD_BE_LSAN_TEST(buffer-allocator-test) +ADD_BE_LSAN_TEST(buffer-pool-test) +ADD_BE_LSAN_TEST(free-list-test) +ADD_BE_LSAN_TEST(reservation-tracker-test) +ADD_BE_LSAN_TEST(suballocator-test) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/runtime/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index 1cfc819..77b6a70 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -1960,7 +1960,7 @@ void BufferPoolTest::TestRandomInternalMulti( } AtomicInt32 stop_maintenance(0); - thread* maintenance_thread = new thread([&pool, &stop_maintenance]() { + thread maintenance_thread([&pool, &stop_maintenance]() { while (stop_maintenance.Load() == 0) { pool.Maintenance(); SleepForMs(50); @@ -1968,7 +1968,7 @@ void BufferPoolTest::TestRandomInternalMulti( }); workers.join_all(); stop_maintenance.Add(1); - maintenance_thread->join(); + maintenance_thread.join(); global_reservations_.Close(); } http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/runtime/data-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index 0a0d81e..8851d1a 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -84,10 +84,6 @@ DECLARE_string(datastream_service_queue_mem_limit); DECLARE_bool(use_krpc); -// We reserve contiguous memory for senders in SetUp. If a test uses more -// senders, a DCHECK will fail and you should increase this value. -static const int MAX_SENDERS = 16; -static const int MAX_RECEIVERS = 16; static const PlanNodeId DEST_NODE_ID = 1; static const int BATCH_CAPACITY = 100; // rows static const int PER_ROW_DATA = 8; @@ -238,9 +234,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit hash_sink_.output_partition.__isset.partition_exprs = true; hash_sink_.output_partition.partition_exprs.push_back(expr); - // Ensure that individual sender info addresses don't change - sender_info_.reserve(MAX_SENDERS); - receiver_info_.reserve(MAX_RECEIVERS); if (GetParam() == USE_THRIFT) { StartThriftBackend(); } else { @@ -329,38 +322,36 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit vector<TPlanFragmentDestination> dest_; struct SenderInfo { - thread* thread_handle; + unique_ptr<thread> thread_handle; Status status; - int num_bytes_sent; - - SenderInfo(): thread_handle(nullptr), num_bytes_sent(0) {} + int num_bytes_sent = 0; }; - vector<SenderInfo> sender_info_; + // Allocate each SenderInfo separately so the address doesn't change. + vector<unique_ptr<SenderInfo>> sender_info_; struct ReceiverInfo { TPartitionType::type stream_type; int num_senders; int receiver_num; - thread* thread_handle; + unique_ptr<thread> thread_handle; shared_ptr<DataStreamRecvrBase> stream_recvr; Status status; - int num_rows_received; + int num_rows_received = 0; multiset<int64_t> data_values; ReceiverInfo(TPartitionType::type stream_type, int num_senders, int receiver_num) : stream_type(stream_type), num_senders(num_senders), - receiver_num(receiver_num), - thread_handle(nullptr), - num_rows_received(0) {} + receiver_num(receiver_num) {} ~ReceiverInfo() { - delete thread_handle; + thread_handle.reset(); stream_recvr.reset(); } }; - vector<ReceiverInfo> receiver_info_; + // Allocate each ReceiveInfo separately so the address doesn't change. + vector<unique_ptr<ReceiverInfo>> receiver_info_; // Create an instance id and add it to dest_ void GetNextInstanceId(TUniqueId* instance_id) { @@ -447,15 +438,16 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver"); TUniqueId instance_id; GetNextInstanceId(&instance_id); - receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num)); - ReceiverInfo& info = receiver_info_.back(); - info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID, + receiver_info_.emplace_back( + make_unique<ReceiverInfo>(stream_type, num_senders, receiver_num)); + ReceiverInfo* info = receiver_info_.back().get(); + info->stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID, num_senders, buffer_size, is_merging, profile, &tracker_, &buffer_pool_client_); if (!is_merging) { - info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info); + info->thread_handle.reset(new thread(&DataStreamTest::ReadStream, this, info)); } else { - info.thread_handle = new thread(&DataStreamTest::ReadStreamMerging, this, &info, - profile); + info->thread_handle.reset(new thread(&DataStreamTest::ReadStreamMerging, this, info, + profile)); } if (out_id != nullptr) *out_id = instance_id; } @@ -463,8 +455,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit void JoinReceivers() { VLOG_QUERY << "join receiver\n"; for (int i = 0; i < receiver_info_.size(); ++i) { - receiver_info_[i].thread_handle->join(); - receiver_info_[i].stream_recvr->Close(); + receiver_info_[i]->thread_handle->join(); + receiver_info_[i]->stream_recvr->Close(); } } @@ -510,20 +502,20 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit int64_t total = 0; multiset<int64_t> all_data_values; for (int i = 0; i < receiver_info_.size(); ++i) { - ReceiverInfo& info = receiver_info_[i]; - EXPECT_OK(info.status); - total += info.data_values.size(); - ASSERT_EQ(info.stream_type, stream_type); - ASSERT_EQ(info.num_senders, num_senders); + ReceiverInfo* info = receiver_info_[i].get(); + EXPECT_OK(info->status); + total += info->data_values.size(); + ASSERT_EQ(info->stream_type, stream_type); + ASSERT_EQ(info->num_senders, num_senders); if (stream_type == TPartitionType::UNPARTITIONED) { EXPECT_EQ( - NUM_BATCHES * BATCH_CAPACITY * num_senders, info.data_values.size()); + NUM_BATCHES * BATCH_CAPACITY * num_senders, info->data_values.size()); } - all_data_values.insert(info.data_values.begin(), info.data_values.end()); + all_data_values.insert(info->data_values.begin(), info->data_values.end()); int k = 0; - for (multiset<int64_t>::iterator j = info.data_values.begin(); - j != info.data_values.end(); ++j, ++k) { + for (multiset<int64_t>::iterator j = info->data_values.begin(); + j != info->data_values.end(); ++j, ++k) { if (stream_type == TPartitionType::UNPARTITIONED) { // unpartitioned streams contain all values as many times as there are // senders @@ -533,7 +525,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit int64_t value = *j; uint64_t hash_val = RawValue::GetHashValueFastHash(&value, TYPE_BIGINT, DataStreamSender::EXCHANGE_HASH_SEED); - EXPECT_EQ(hash_val % receiver_info_.size(), info.receiver_num); + EXPECT_EQ(hash_val % receiver_info_.size(), info->receiver_num); } } } @@ -553,8 +545,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit void CheckSenders() { for (int i = 0; i < sender_info_.size(); ++i) { - EXPECT_OK(sender_info_[i].status); - EXPECT_GT(sender_info_[i].num_bytes_sent, 0); + EXPECT_OK(sender_info_[i]->status); + EXPECT_GT(sender_info_[i]->num_bytes_sent, 0); } } @@ -595,18 +587,16 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit int channel_buffer_size = 1024) { VLOG_QUERY << "start sender"; int num_senders = sender_info_.size(); - ASSERT_LT(num_senders, MAX_SENDERS); - sender_info_.push_back(SenderInfo()); - SenderInfo& info = sender_info_.back(); - info.thread_handle = + sender_info_.emplace_back(make_unique<SenderInfo>()); + sender_info_.back()->thread_handle.reset( new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size, - partition_type, GetParam() == USE_THRIFT); + partition_type, GetParam() == USE_THRIFT)); } void JoinSenders() { VLOG_QUERY << "join senders\n"; for (int i = 0; i < sender_info_.size(); ++i) { - sender_info_[i].thread_handle->join(); + sender_info_[i]->thread_handle->join(); } } @@ -640,22 +630,22 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit EXPECT_OK(sender->Prepare(&state, &tracker_)); EXPECT_OK(sender->Open(&state)); scoped_ptr<RowBatch> batch(CreateRowBatch()); - SenderInfo& info = sender_info_[sender_num]; + SenderInfo* info = sender_info_[sender_num].get(); int next_val = 0; for (int i = 0; i < NUM_BATCHES; ++i) { GetNextBatch(batch.get(), &next_val); VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows(); - info.status = sender->Send(&state, batch.get()); - if (!info.status.ok()) break; + info->status = sender->Send(&state, batch.get()); + if (!info->status.ok()) break; } VLOG_QUERY << "closing sender" << sender_num; - info.status.MergeStatus(sender->FlushFinal(&state)); + info->status.MergeStatus(sender->FlushFinal(&state)); sender->Close(&state); if (is_thrift) { - info.num_bytes_sent = static_cast<DataStreamSender*>( + info->num_bytes_sent = static_cast<DataStreamSender*>( sender.get())->GetNumDataBytesSent(); } else { - info.num_bytes_sent = static_cast<KrpcDataStreamSender*>( + info->num_bytes_sent = static_cast<KrpcDataStreamSender*>( sender.get())->GetNumDataBytesSent(); } @@ -745,7 +735,7 @@ TEST_P(DataStreamTest, UnknownSenderSmallResult) { GetNextInstanceId(&dummy_id); StartSender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024); JoinSenders(); - EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT); + EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT); } TEST_P(DataStreamTest, UnknownSenderLargeResult) { @@ -754,7 +744,7 @@ TEST_P(DataStreamTest, UnknownSenderLargeResult) { GetNextInstanceId(&dummy_id); StartSender(); JoinSenders(); - EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT); + EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT); } TEST_P(DataStreamTest, Cancel) { @@ -764,8 +754,8 @@ TEST_P(DataStreamTest, Cancel) { StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true, &instance_id); stream_mgr_->Cancel(instance_id); JoinReceivers(); - EXPECT_TRUE(receiver_info_[0].status.IsCancelled()); - EXPECT_TRUE(receiver_info_[1].status.IsCancelled()); + EXPECT_TRUE(receiver_info_[0]->status.IsCancelled()); + EXPECT_TRUE(receiver_info_[1]->status.IsCancelled()); } TEST_P(DataStreamTest, BasicTest) { @@ -867,12 +857,13 @@ TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) { // Setup the receiver. RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver"); - receiver_info_.push_back(ReceiverInfo(TPartitionType::UNPARTITIONED, 4, 1)); - ReceiverInfo& info = receiver_info_.back(); - info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID, + receiver_info_.emplace_back( + make_unique<ReceiverInfo>(TPartitionType::UNPARTITIONED, 4, 1)); + ReceiverInfo* info = receiver_info_.back().get(); + info->stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID, 4, 1024 * 1024, false, profile, &tracker_, &buffer_pool_client_); - info.thread_handle = new thread( - &DataStreamTestShortDeserQueue_TestNoDeadlock_Test::ReadStream, this, &info); + info->thread_handle.reset(new thread( + &DataStreamTestShortDeserQueue_TestNoDeadlock_Test::ReadStream, this, info)); JoinSenders(); CheckSenders(); http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/runtime/io/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt index 226fdc6..4c8199f 100644 --- a/be/src/runtime/io/CMakeLists.txt +++ b/be/src/runtime/io/CMakeLists.txt @@ -36,4 +36,4 @@ add_dependencies(Io gen-deps) add_executable(disk-io-mgr-stress-test disk-io-mgr-stress-test.cc) target_link_libraries(disk-io-mgr-stress-test ${IMPALA_TEST_LINK_LIBS}) -ADD_BE_TEST(disk-io-mgr-test) +ADD_BE_LSAN_TEST(disk-io-mgr-test) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/runtime/io/disk-io-mgr-stress.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc index c9b6870..16cfbf2 100644 --- a/be/src/runtime/io/disk-io-mgr-stress.cc +++ b/be/src/runtime/io/disk-io-mgr-stress.cc @@ -96,7 +96,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk, CreateTempFile(files_[i].filename.c_str(), files_[i].data.c_str()); } - clients_ = new Client[num_clients_]; + clients_.reset(new Client[num_clients_]); client_mem_trackers_.resize(num_clients_); buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]); for (int i = 0; i < num_clients_; ++i) { @@ -104,6 +104,8 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk, } } +DiskIoMgrStress::~DiskIoMgrStress() { } + void DiskIoMgrStress::ClientThread(int client_id) { Client* client = &clients_[client_id]; Status status; http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/runtime/io/disk-io-mgr-stress.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h index 574b58c..1baef24 100644 --- a/be/src/runtime/io/disk-io-mgr-stress.h +++ b/be/src/runtime/io/disk-io-mgr-stress.h @@ -42,6 +42,7 @@ class DiskIoMgrStress { public: DiskIoMgrStress(int num_disks, int num_threads_per_disk, int num_clients, bool includes_cancellation); + ~DiskIoMgrStress(); /// Run the test for 'sec'. If 0, run forever void Run(int sec); @@ -84,7 +85,7 @@ class DiskIoMgrStress { /// Array of clients int num_clients_; - Client* clients_; + std::unique_ptr<Client[]> clients_; /// Client MemTrackers, one per client. std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_; http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/runtime/io/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc index 9239658..3d89d04 100644 --- a/be/src/runtime/io/disk-io-mgr-test.cc +++ b/be/src/runtime/io/disk-io-mgr-test.cc @@ -1041,7 +1041,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) { unique_ptr<BufferPool::ClientHandle[]> clients( new BufferPool::ClientHandle[NUM_READERS]); vector<unique_ptr<RequestContext>> readers(NUM_READERS); - vector<char*> results(NUM_READERS); + vector<unique_ptr<char[]>> results(NUM_READERS); // Initialize data for each reader. The data will be // 'abcd...' for reader one, 'bcde...' for reader two (wrapping around at 'z') @@ -1063,8 +1063,8 @@ TEST_F(DiskIoMgrTest, MultipleReader) { stat(file_names[i].c_str(), &stat_val); mtimes[i] = stat_val.st_mtime; - results[i] = new char[DATA_LEN + 1]; - memset(results[i], 0, DATA_LEN + 1); + results[i].reset(new char[DATA_LEN + 1]); + memset(results[i].get(), 0, DATA_LEN + 1); } // This exercises concurrency, run the test multiple times http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/runtime/mem-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc index 4762928..237484f 100644 --- a/be/src/runtime/mem-tracker.cc +++ b/be/src/runtime/mem-tracker.cc @@ -130,8 +130,7 @@ void MemTracker::CloseAndUnregisterFromParent() { } void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& counters) { - ReservationTrackerCounters* new_counters = new ReservationTrackerCounters(counters); - reservation_counters_.Store(new_counters); + delete reservation_counters_.Swap(new ReservationTrackerCounters(counters)); } int64_t MemTracker::GetPoolMemReserved() { http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/scheduling/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt index 024e6e1..47fd438 100644 --- a/be/src/scheduling/CMakeLists.txt +++ b/be/src/scheduling/CMakeLists.txt @@ -33,7 +33,5 @@ add_library(Scheduling STATIC ) add_dependencies(Scheduling gen-deps) -ADD_BE_TEST(scheduler-test) -ADD_BE_TEST(backend-config-test) -# TODO: Add BE test -# ADD_BE_TEST(admission-controller-test) +ADD_BE_LSAN_TEST(scheduler-test) +ADD_BE_LSAN_TEST(backend-config-test) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/service/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index 66cf55d..71704c8 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -74,7 +74,7 @@ target_link_libraries(impalad ${IMPALA_LINK_LIBS} ) -ADD_BE_TEST(session-expiry-test session-expiry-test.cc) -ADD_BE_TEST(hs2-util-test hs2-util-test.cc) -ADD_BE_TEST(query-options-test query-options-test.cc) -ADD_BE_TEST(impala-server-test impala-server-test.cc) +ADD_BE_TEST(session-expiry-test session-expiry-test.cc) # TODO: this leaks thrift server +ADD_BE_LSAN_TEST(hs2-util-test hs2-util-test.cc) +ADD_BE_LSAN_TEST(query-options-test query-options-test.cc) +ADD_BE_LSAN_TEST(impala-server-test impala-server-test.cc) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/statestore/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/statestore/CMakeLists.txt b/be/src/statestore/CMakeLists.txt index 313b2ec..a2bc4c0 100644 --- a/be/src/statestore/CMakeLists.txt +++ b/be/src/statestore/CMakeLists.txt @@ -30,4 +30,4 @@ add_library(Statestore ) add_dependencies(Statestore gen-deps) -ADD_BE_TEST(statestore-test) +ADD_BE_LSAN_TEST(statestore-test) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/udf/udf.cc ---------------------------------------------------------------------- diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc index 89eb8d1..032f036 100644 --- a/be/src/udf/udf.cc +++ b/be/src/udf/udf.cc @@ -79,6 +79,8 @@ class FreePool { class MemPool { public: uint8_t* Allocate(int byte_size) { + // TODO: this function is called with this == nullptr from UdaTestHarness. This works + // for now because MemPool has no members or virtual functions. return reinterpret_cast<uint8_t*>(malloc(byte_size)); } }; http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 095193d..99c8e6b 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -107,40 +107,40 @@ target_link_libraries(parquet-reader ${IMPALA_LINK_LIBS}) target_link_libraries(loggingsupport ${IMPALA_LINK_LIBS_DYNAMIC_TARGETS}) -ADD_BE_TEST(benchmark-test) -ADD_BE_TEST(bitmap-test) -ADD_BE_TEST(bit-packing-test) -ADD_BE_TEST(bit-util-test) -ADD_BE_TEST(blocking-queue-test) -ADD_BE_TEST(bloom-filter-test) -ADD_BE_TEST(coding-util-test) -ADD_BE_TEST(debug-util-test) -ADD_BE_TEST(decompress-test) -ADD_BE_TEST(dict-test) -ADD_BE_TEST(error-util-test) -ADD_BE_TEST(filesystem-util-test) -ADD_BE_TEST(fixed-size-hash-table-test) -ADD_BE_TEST(hdfs-util-test) -ADD_BE_TEST(internal-queue-test) -ADD_BE_TEST(logging-support-test) -ADD_BE_TEST(lru-cache-test) -ADD_BE_TEST(metrics-test) -ADD_BE_TEST(min-max-filter-test) +ADD_BE_LSAN_TEST(benchmark-test) +ADD_BE_LSAN_TEST(bitmap-test) +ADD_BE_LSAN_TEST(bit-packing-test) +ADD_BE_LSAN_TEST(bit-util-test) +ADD_BE_LSAN_TEST(blocking-queue-test) +ADD_BE_LSAN_TEST(bloom-filter-test) +ADD_BE_LSAN_TEST(coding-util-test) +ADD_BE_LSAN_TEST(debug-util-test) +ADD_BE_LSAN_TEST(decompress-test) +ADD_BE_LSAN_TEST(dict-test) +ADD_BE_LSAN_TEST(error-util-test) +ADD_BE_LSAN_TEST(filesystem-util-test) +ADD_BE_LSAN_TEST(fixed-size-hash-table-test) +ADD_BE_LSAN_TEST(hdfs-util-test) +ADD_BE_LSAN_TEST(internal-queue-test) +ADD_BE_LSAN_TEST(logging-support-test) +ADD_BE_LSAN_TEST(lru-cache-test) +ADD_BE_LSAN_TEST(metrics-test) +ADD_BE_LSAN_TEST(min-max-filter-test) ADD_BE_LSAN_TEST(openssl-util-test) -ADD_BE_TEST(parse-util-test) -ADD_BE_TEST(pretty-printer-test) -ADD_BE_TEST(proc-info-test) -ADD_BE_TEST(promise-test) -ADD_BE_TEST(redactor-config-parser-test) -ADD_BE_TEST(redactor-test) -ADD_BE_TEST(redactor-unconfigured-test) -ADD_BE_TEST(rle-test) -ADD_BE_TEST(runtime-profile-test) -ADD_BE_TEST(string-parser-test) -ADD_BE_TEST(string-util-test) -ADD_BE_TEST(symbols-util-test) -ADD_BE_TEST(sys-info-test) -ADD_BE_TEST(thread-pool-test) -ADD_BE_TEST(time-test) -ADD_BE_TEST(uid-util-test) -ADD_BE_TEST(webserver-test) +ADD_BE_LSAN_TEST(parse-util-test) +ADD_BE_LSAN_TEST(pretty-printer-test) +ADD_BE_LSAN_TEST(proc-info-test) +ADD_BE_LSAN_TEST(promise-test) +ADD_BE_LSAN_TEST(redactor-config-parser-test) +ADD_BE_LSAN_TEST(redactor-test) +ADD_BE_LSAN_TEST(redactor-unconfigured-test) +ADD_BE_LSAN_TEST(rle-test) +ADD_BE_LSAN_TEST(runtime-profile-test) +ADD_BE_LSAN_TEST(string-parser-test) +ADD_BE_LSAN_TEST(string-util-test) +ADD_BE_LSAN_TEST(symbols-util-test) +ADD_BE_LSAN_TEST(sys-info-test) +ADD_BE_LSAN_TEST(thread-pool-test) +ADD_BE_LSAN_TEST(time-test) +ADD_BE_LSAN_TEST(uid-util-test) +ADD_BE_LSAN_TEST(webserver-test) http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/be/src/util/decompress-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc index 2a2299c..4d5f8ef 100644 --- a/be/src/util/decompress-test.cc +++ b/be/src/util/decompress-test.cc @@ -449,7 +449,7 @@ TEST_F(DecompressorTest, LZ4Huge) { // Generate a big random payload. int payload_len = numeric_limits<int>::max(); - uint8_t* payload = new uint8_t[payload_len]; + unique_ptr<uint8_t[]> payload(new uint8_t[payload_len]); for (int i = 0 ; i < payload_len; ++i) payload[i] = rand(); scoped_ptr<Codec> compressor; @@ -462,9 +462,10 @@ TEST_F(DecompressorTest, LZ4Huge) { // Trying to compress it should give an error int64_t compressed_len = max_size; - uint8_t* compressed = new uint8_t[max_size]; - EXPECT_ERROR(compressor->ProcessBlock(true, payload_len, payload, - &compressed_len, &compressed), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE); + unique_ptr<uint8_t[]> compressed(new uint8_t[max_size]); + uint8_t* compressed_ptr = compressed.get(); + EXPECT_ERROR(compressor->ProcessBlock(true, payload_len, payload.get(), + &compressed_len, &compressed_ptr), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE); } } http://git-wip-us.apache.org/repos/asf/impala/blob/5c3f06c7/bin/lsan-suppressions.txt ---------------------------------------------------------------------- diff --git a/bin/lsan-suppressions.txt b/bin/lsan-suppressions.txt new file mode 100644 index 0000000..f59592b --- /dev/null +++ b/bin/lsan-suppressions.txt @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This file suppresses Leak Sanitizer errors, following +# https://clang.llvm.org/docs/LeakSanitizer.html + +# The JVM leaks things directly from libjvm.so and indirectly via inflate(). +leak:libjvm.so +leak:inflate + +# Some test utilities have deliberate leaks +leak:InProcessImpalaServer::StartWithEphemeralPorts +leak:InProcessStatestore::StartWithEphemeralPorts + +# MemTestPrepare is deliberately used to simulate a UDF leaking memory. +leak:MemTestPrepare + +# TODO: IMPALA-2746: fix these unnecessary leaks. +# The UDF and UDA test harnesses are sloppy and leak result allocations. +leak:impala::FunctionContextImpl::AllocateForResults
