This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5e69ae1d7dc113bbcc8d7d75e3b1b5244e76f76a Author: Riza Suminto <[email protected]> AuthorDate: Mon Apr 6 22:24:05 2020 -0700 IMPALA-9612: Fix race condition in RuntimeFilter::WaitForArrival In function RuntimeFilter::WaitForArrival, there is a race condition where condition variable arrival_cv_ may be signaled right after thread get into the loop and before it call arrival_cv_.WaitFor(). This can cause runtime filter to wait the entire RUNTIME_FILTER_WAIT_TIME_MS even though the filter has arrived or canceled earlier than that. This commit avoid the race condition by making RuntimeFilter::SetFilter and RuntimeFilter::Cancel acquire arrival_mutex_ first before checking the value of arrival_time_ and release arrival_mutex_ before signaling arrival_cv_. Testing: - Add new be test runtime-filter-test.cc - Pass core tests. Change-Id: I7dffa626103ef0af06ad1e89231b0d2ee54bb94a Reviewed-on: http://gerrit.cloudera.org:8080/15673 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/runtime/CMakeLists.txt | 2 + be/src/runtime/runtime-filter-test.cc | 103 ++++++++++++++++++++++++++++++++++ be/src/runtime/runtime-filter.cc | 33 +++++++---- be/src/runtime/runtime-filter.h | 7 +++ 4 files changed, 133 insertions(+), 12 deletions(-) diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 6990421..12e279e 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -98,6 +98,7 @@ add_library(RuntimeTests STATIC multi-precision-test.cc raw-value-test.cc row-batch-serialize-test.cc + runtime-filter-test.cc string-buffer-test.cc string-compare-test.cc string-search-test.cc @@ -131,6 +132,7 @@ ADD_UNIFIED_BE_LSAN_TEST(hdfs-fs-cache-test "HdfsFsCacheTest.*") ADD_UNIFIED_BE_LSAN_TEST(tmp-file-mgr-test "TmpFileMgrTest.*") ADD_UNIFIED_BE_LSAN_TEST(row-batch-serialize-test "RowBatchSerializeTest.*") # Exception to unified be tests: Custom main function with global Frontend object +ADD_UNIFIED_BE_LSAN_TEST(runtime-filter-test "RuntimeFilterTest.*") ADD_BE_LSAN_TEST(row-batch-test) # Exception to unified be tests: Custom main function with global Frontend object ADD_BE_LSAN_TEST(collection-value-builder-test) diff --git a/be/src/runtime/runtime-filter-test.cc b/be/src/runtime/runtime-filter-test.cc new file mode 100644 index 0000000..76680a5 --- /dev/null +++ b/be/src/runtime/runtime-filter-test.cc @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/thread/thread.hpp> + +#include "common/init.h" +#include "common/object-pool.h" +#include "runtime/runtime-filter.h" +#include "runtime/runtime-filter.inline.h" +#include "testutil/gtest-util.h" +#include "util/stopwatch.h" + +#include "common/names.h" + +using namespace impala; + +namespace impala { + +class RuntimeFilterTest : public testing::Test { + protected: + ObjectPool pool_; + MemTracker tracker_; + + virtual void SetUp() {} + + virtual void TearDown() { pool_.Clear(); } + + void SetDelay(RuntimeFilter* rf, int64_t delay) { rf->injection_delay_ = delay; } +}; + +struct TestConfig { + RuntimeFilter* runtime_filter; + int64_t injection_delay; + int64_t wait_for_ms; + MinMaxFilter* min_max_filter; +}; + +// Test that RuntimeFilter stop waiting after it is canceled. +// See IMPALA-9612. +TEST_F(RuntimeFilterTest, Canceled) { + TRuntimeFilterDesc desc; + desc.__set_type(TRuntimeFilterType::MIN_MAX); + RuntimeFilter* rf = pool_.Add(new RuntimeFilter(desc, desc.filter_size_bytes)); + TestConfig tc = {rf, 500, 1000, nullptr}; + + SetDelay(rf, tc.injection_delay); + MonotonicStopWatch sw; + thread_group workers; + + sw.Start(); + workers.add_thread( + new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); })); + SleepForMs(100); // give waiting thread a head start + workers.add_thread(new thread([&tc] { tc.runtime_filter->Cancel(); })); + workers.join_all(); + sw.Stop(); + + ASSERT_GE(tc.runtime_filter->arrival_delay_ms(), tc.injection_delay); + ASSERT_LT(sw.ElapsedTime(), (tc.injection_delay + tc.wait_for_ms) * 1000000); +} + +// Test that RuntimeFilter stop waiting after the filter arrived. +// See IMPALA-9612. +TEST_F(RuntimeFilterTest, Arrived) { + TRuntimeFilterDesc desc; + desc.__set_type(TRuntimeFilterType::MIN_MAX); + RuntimeFilter* rf = pool_.Add(new RuntimeFilter(desc, desc.filter_size_bytes)); + MinMaxFilter* mmf = + MinMaxFilter::Create(ColumnType(PrimitiveType::TYPE_BOOLEAN), &pool_, &tracker_); + TestConfig tc = {rf, 500, 1000, mmf}; + + SetDelay(rf, tc.injection_delay); + MonotonicStopWatch sw; + thread_group workers; + + sw.Start(); + workers.add_thread( + new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); })); + SleepForMs(100); // give waiting thread a head start + workers.add_thread( + new thread([&tc] { tc.runtime_filter->SetFilter(nullptr, tc.min_max_filter); })); + workers.join_all(); + sw.Stop(); + + ASSERT_GE(tc.runtime_filter->arrival_delay_ms(), tc.injection_delay); + ASSERT_LT(sw.ElapsedTime(), (tc.injection_delay + tc.wait_for_ms) * 1000000); +} + +} // namespace impala diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc index 4b013a6..1ff0a13 100644 --- a/be/src/runtime/runtime-filter.cc +++ b/be/src/runtime/runtime-filter.cc @@ -26,17 +26,20 @@ using namespace impala; const char* RuntimeFilter::LLVM_CLASS_NAME = "class.impala::RuntimeFilter"; void RuntimeFilter::SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) { - DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times."; - DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr); - if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled. - if (is_bloom_filter()) { - bloom_filter_.Store(bloom_filter); - } else { - DCHECK(is_min_max_filter()); - min_max_filter_.Store(min_max_filter); + { + unique_lock<mutex> l(arrival_mutex_); + DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times."; + DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr); + if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled. + if (is_bloom_filter()) { + bloom_filter_.Store(bloom_filter); + } else { + DCHECK(is_min_max_filter()); + min_max_filter_.Store(min_max_filter); + } + arrival_time_.Store(MonotonicMillis()); + has_filter_.Store(true); } - arrival_time_.Store(MonotonicMillis()); - has_filter_.Store(true); arrival_cv_.NotifyAll(); } @@ -64,8 +67,11 @@ void RuntimeFilter::Or(RuntimeFilter* other) { } void RuntimeFilter::Cancel() { - if (arrival_time_.Load() != 0) return; - arrival_time_.Store(MonotonicMillis()); + { + unique_lock<mutex> l(arrival_mutex_); + if (arrival_time_.Load() != 0) return; + arrival_time_.Store(MonotonicMillis()); + } arrival_cv_.NotifyAll(); } @@ -75,6 +81,9 @@ bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const { int64_t ms_since_registration = MonotonicMillis() - registration_time_; int64_t ms_remaining = timeout_ms - ms_since_registration; if (ms_remaining <= 0) break; +#ifndef NDEBUG + if (injection_delay_ > 0) SleepForMs(injection_delay_); +#endif arrival_cv_.WaitFor(l, ms_remaining * MICROS_PER_MILLI); } return arrival_time_.Load() != 0; diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h index 5108f83..e2a4a67 100644 --- a/be/src/runtime/runtime-filter.h +++ b/be/src/runtime/runtime-filter.h @@ -28,6 +28,7 @@ namespace impala { class BloomFilter; +class RuntimeFilterTest; /// RuntimeFilters represent set-membership predicates that are computed during query /// execution (rather than during planning). They can then be sent to other operators to @@ -117,6 +118,8 @@ class RuntimeFilter { static const char* LLVM_CLASS_NAME; private: + friend class RuntimeFilterTest; + /// Membership bloom_filter. May be NULL even after arrival_time_ is set, meaning that /// it does not filter any rows, either because it was not created /// (filter_desc_.bloom_filter is false), there was not enough memory, or the false @@ -148,5 +151,9 @@ class RuntimeFilter { /// Signalled when a filter arrives or the filter is cancelled. Paired with /// 'arrival_mutex_' mutable ConditionVariable arrival_cv_; + + /// Injection delay for WaitForArrival. Used in testing only. + /// See IMPALA-9612. + int64_t injection_delay_ = 0; }; }
