This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5ae0d8f94843de4a0835de6cf82651d74c4901a0
Author: jasonmfehr <[email protected]>
AuthorDate: Wed Dec 31 15:47:57 2025 -0800

    IMPALA-13679: Fix ASAN Failure in Workload Management Tests
    
    The TickerSecondsBool class defined in ticker.h is used to
    process queued completed queries at specified intervals.
    This class references variables defined outside of it. The
    workload management code never properly shuts down this
    ticker resulting in it sometimes, during shutdown, to
    access variables that have already been released.
    
    Removes the Ticker class leveraging the available
    primitives of the std::condition_variable type.
    
    The Ticker class was originally developed to avoid spurious
    wakeups of a std::condition_variable, but the functionality
    provided by the Ticker class can be eliminated by leveraging
    the built in functionality of std::condition_variable.
    
    In the workload-management-worker.cc file, adds the static
    keyword to variables that are intended to be used only in
    that file.
    
    Testing
    * workload management custom cluster tests passed locally
      and in jenkins build
    * TSAN/ASAN builds passed with workload management enabled
      on the EE test impalad
    
    Change-Id: I9a29f4f788cf7259c263a17272e86acdda5caa77
    Reviewed-on: http://gerrit.cloudera.org:8080/23818
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/impala-server.h               |   1 -
 be/src/service/workload-management-worker.cc |  45 +++++----
 be/src/util/CMakeLists.txt                   |   2 -
 be/src/util/ticker-test.cc                   | 133 -------------------------
 be/src/util/ticker.h                         | 143 ---------------------------
 5 files changed, 22 insertions(+), 302 deletions(-)

diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 120f6cb65..ae6056ec6 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -52,7 +52,6 @@
 #include "util/sharded-query-map-util.h"
 #include "util/simple-logger.h"
 #include "util/thread-pool.h"
-#include "util/ticker.h"
 #include "util/time.h"
 
 namespace kudu {
diff --git a/be/src/service/workload-management-worker.cc 
b/be/src/service/workload-management-worker.cc
index 2c77c70c4..adaf64658 100644
--- a/be/src/service/workload-management-worker.cc
+++ b/be/src/service/workload-management-worker.cc
@@ -58,7 +58,6 @@
 #include "util/sql-util.h"
 #include "util/stopwatch.h"
 #include "util/string-util.h"
-#include "util/ticker.h"
 #include "workload_mgmt/workload-management.h"
 
 using namespace impala;
@@ -428,14 +427,14 @@ const std::array<FieldParser, NumQueryTableColumns> 
FIELD_PARSERS = {{
 } // namespace workloadmgmt
 
 /// Queue of completed queries and the mutex to synchronize access to it.
-list<CompletedQuery> _completed_queries;
-mutex _completed_queries_mu;
+static list<CompletedQuery> _completed_queries;
+static mutex _completed_queries_mu;
 
 /// Coordinate periodic execution of the completed queries queue processing 
thread.
-condition_variable _completed_queries_cv;
+static condition_variable _completed_queries_cv;
 
 /// Coordinate shutdown of the completed queries queue processing thread.
-condition_variable _completed_queries_shutdown_cv;
+static condition_variable _completed_queries_shutdown_cv;
 
 /// Determine if the maximum number of queued completed queries has been 
exceeded.
 ///
@@ -504,7 +503,14 @@ void ImpalaServer::ShutdownWorkloadManagement() {
   if (workload_mgmt_state_ == WorkloadManagementState::RUNNING) {
     workload_mgmt_state_ = WorkloadManagementState::SHUTTING_DOWN;
     LOG(INFO) << "Workload management is shutting down";
+
+    // Wake up the completed queries processing thread so it can drain the 
in-memory
+    // completed queries queue.
     _completed_queries_cv.notify_all();
+
+    // Wait for the completed queries processing thread to drain the in-memory 
queue. If
+    // the timeout expires before the queue is drained, the thread will be 
detached
+    // and shutdown will continue.
     _completed_queries_shutdown_cv.wait_for(l,
         chrono::seconds(FLAGS_query_log_shutdown_timeout_s),
         [this] { return workload_mgmt_state_ == 
WorkloadManagementState::SHUTDOWN; });
@@ -612,6 +618,8 @@ static string _dmlPrefix(const string& table_name, const 
Version& target_schema_
   return fields.str();
 } // function _dmlPrefix
 
+// Returns true if workload management is shutting down. If it is, this 
function updates
+// the workload management state to SHUTDOWN.
 bool ImpalaServer::IsWorkloadManagementShuttingDown() {
   lock_guard<mutex> l(workload_mgmt_state_mu_);
 
@@ -636,10 +644,6 @@ void ImpalaServer::WorkloadManagementWorker(const Version& 
target_schema_version
     workload_mgmt_state_ = WorkloadManagementState::STARTED;
   }
 
-  /// Ticker that wakes up at set intervals to process the queued completed 
queries. Uses
-  /// the _completed_queries_mu to synchonize access to the _completed_queries 
list.
-  unique_ptr<TickerSecondsBool> completed_queries_ticker;
-
   {
     lock_guard<mutex> l(workload_mgmt_state_mu_);
     // This condition will evaluate to false only if a clean shutdown was 
initiated while
@@ -651,11 +655,6 @@ void ImpalaServer::WorkloadManagementWorker(const Version& 
target_schema_version
                 << "coordinator shutdown was initiated.";
       return; // Note: early return
     }
-
-    completed_queries_ticker = make_unique<TickerSecondsBool>(
-        FLAGS_query_log_write_interval_s, _completed_queries_cv, 
_completed_queries_mu);
-    ABORT_IF_ERROR(
-        completed_queries_ticker->Start("impala-server", 
"completed-queries-ticker"));
   }
 
   // Non-values portion of the sql DML to insert records into the completed 
queries
@@ -691,22 +690,22 @@ void ImpalaServer::WorkloadManagementWorker(const 
Version& target_schema_version
       return; // Note: early return
     }
 
-    // Sleep this thread until it is time to process queued completed queries. 
During the
-    // wait, the _completed_queries_mu is only locked while calling the lambda 
function
-    // predicate. After waking up, the _completed_queries_mu will be locked.
+    // Sleep this thread until it is time to process queued completed queries 
or the
+    // _completed_queries_cv condition variable is notified either on shutdown 
(from the
+    // ShutdownWorkloadManagement() function) or when the max number of queued 
queries is
+    // exceeded (from the EnqueueCompletedQuery() function).
+    const chrono::seconds write_interval_s(FLAGS_query_log_write_interval_s);
+    chrono::time_point<std::chrono::steady_clock> next_wake_time;
     list<CompletedQuery> queries_to_insert;
     MonotonicStopWatch timer;
     {
+      next_wake_time = chrono::steady_clock::now() + write_interval_s;
       unique_lock<mutex> l(_completed_queries_mu);
-      _completed_queries_cv.wait(l, [this, &completed_queries_ticker] {
-        lock_guard<mutex> l2(workload_mgmt_state_mu_);
-        // To guard against spurious wakeups, this predicate ensures there are
-        // completed queries queued up before waking up the thread.
-        return (completed_queries_ticker->WakeupGuard()() && 
!_completed_queries.empty())
+      _completed_queries_cv.wait_until(l, next_wake_time, [&next_wake_time, 
this] {
+        return chrono::steady_clock::now() >= next_wake_time
             || _maxRecordsExceeded(_completed_queries.size())
             || UNLIKELY(workload_mgmt_state_ == 
WorkloadManagementState::SHUTTING_DOWN);
       });
-      completed_queries_ticker->ResetWakeupGuard();
 
       DebugActionNoFail(FLAGS_debug_actions, "WM_SHUTDOWN_DELAY");
 
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 26cb475cf..41ff9699b 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -236,7 +236,6 @@ add_library(UtilTests STATIC
   system-state-info-test.cc
   tagged-ptr-test.cc
   thread-pool-test.cc
-  ticker-test.cc
   time-test.cc
   tuple-row-compare-test.cc
   uid-util-test.cc
@@ -302,7 +301,6 @@ ADD_UNIFIED_BE_LSAN_TEST(symbols-util-test "SymbolsUtil.*")
 ADD_UNIFIED_BE_LSAN_TEST(system-state-info-test "SystemStateInfoTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(sys-info-test "CpuInfoTest.*:DiskInfoTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(thread-pool-test "ThreadPoolTest.*")
-ADD_UNIFIED_BE_LSAN_TEST(ticker-test "TickerTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(time-test "TimeTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(tuple-row-compare-test "TupleRowCompareTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(uid-util-test "UidUtil.*")
diff --git a/be/src/util/ticker-test.cc b/be/src/util/ticker-test.cc
deleted file mode 100644
index 469a19dc3..000000000
--- a/be/src/util/ticker-test.cc
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <chrono>
-#include <condition_variable>
-#include <memory>
-#include <mutex>
-#include <string>
-
-#include "testutil/gtest-util.h"
-
-#include "common/status.h"
-#include "util/stopwatch.h"
-#include "util/ticker.h"
-
-using namespace std;
-
-namespace impala {
-
-static inline float NsToMs(int64_t nanos) {
-  return static_cast<float>(nanos / 1000000);
-}
-
-TEST(TickerTest, TickerSecondsBoolHappyPath) {
-  condition_variable cv;
-  mutex mu;
-  uint8_t cntr = 0;
-
-  TickerSecondsBool fixture(1, cv, mu);
-  MonotonicStopWatch sw;
-
-  sw.Start();
-  ABORT_IF_ERROR(fixture.Start("category", "tickersecondsbool-happy-path"));
-
-  while (cntr < 3) {
-    unique_lock<mutex> l(mu);
-    cv.wait(l, fixture.WakeupGuard());
-    fixture.ResetWakeupGuard();
-    cntr++;
-  }
-
-  sw.Stop();
-
-  fixture.RequestStop();
-  fixture.Join();
-
-  EXPECT_EQ(cntr, 3);
-  // Include a 30 millisecond (1%) margin of error to tolerate differences in 
the
-  // precision of time measurements.
-  EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(3000), 30);
-}
-
-TEST(TickerTest, GenericTickerHappyPath) {
-  condition_variable cv;
-  mutex mu;
-  shared_ptr<string> wakeup_guard = make_shared<string>();
-  uint8_t cntr = 0;
-  const string wakeup_val = "wakeup";
-
-  Ticker<chrono::milliseconds, string> fixture(chrono::milliseconds(5), cv, mu,
-      wakeup_guard, wakeup_val);
-  MonotonicStopWatch sw;
-
-  sw.Start();
-  ABORT_IF_ERROR(fixture.Start("category", "generic-ticker-happy-path"));
-
-  while (cntr < 10) {
-    unique_lock<mutex> l(mu);
-    cv.wait(l, fixture.WakeupGuard());
-    *wakeup_guard = "";
-    cntr++;
-  }
-
-  sw.Stop();
-
-  fixture.RequestStop();
-  fixture.Join();
-
-  EXPECT_EQ(cntr, 10);
-  // Include a 10 millisecond (2%) margin of error to tolerate differences in 
the
-  // precision of time measurements.
-  EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(50), 10);
-}
-
-// Tests the case where the wakeup guard is not reset by the consuming code.
-TEST(TickerTest, GenericTickerNoWakeupGuardReset) {
-  condition_variable cv;
-  mutex mu;
-  shared_ptr<string> wakeup_guard = make_shared<string>();
-  uint8_t cntr = 0;
-  const string wakeup_val = "wakeup";
-
-  Ticker<chrono::milliseconds, string> fixture(chrono::milliseconds(5), cv, mu,
-      wakeup_guard, wakeup_val);
-  MonotonicStopWatch sw;
-
-  sw.Start();
-  ABORT_IF_ERROR(fixture.Start("category", "generic-ticker-happy-path"));
-
-  while (cntr < 10) {
-    unique_lock<mutex> l(mu);
-    cv.wait(l, fixture.WakeupGuard());
-    // No wakeup guard reset here.
-    cntr++;
-  }
-
-  sw.Stop();
-
-  fixture.RequestStop();
-  fixture.Join();
-
-  EXPECT_EQ(cntr, 10);
-  // If the wakeup guard was set properly, elapsed time would be 50 
milliseconds. Since
-  // the wakeup guard does not get set, spurious wakeups of the condition 
variable happen
-  // much more frequently than they should.
-  EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(5), 5);
-}
-
-} // namespace impala
diff --git a/be/src/util/ticker.h b/be/src/util/ticker.h
deleted file mode 100644
index 116e9574a..000000000
--- a/be/src/util/ticker.h
+++ /dev/null
@@ -1,143 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <chrono>
-#include <condition_variable>
-#include <functional>
-#include <memory>
-#include <mutex>
-#include <thread>
-
-#include <boost/bind.hpp>
-
-#include "common/atomic.h"
-#include "common/status.h"
-#include "util/thread.h"
-
-namespace impala {
-
-// Manages a thread that periodically notifies a condition variable. This 
thread never
-// returns. An indicator variable must be specified to guard against spurious 
wakeups.
-//
-// Immediately before this class notfies the condition variable, it sets the 
indicator
-// variable to the `wakeup_value` specified in the constructor. It is the 
responsibility
-// of the thread consuming this class to reset the indicator variable to a 
value other
-// than `wakeup_value` before the consuming thread goes to sleep.
-//
-// If the periodic code takes longer to run than the specified duration, then 
the code
-// will immediately execute the next time around.
-//
-// Internally, this class uses std::this_thread:sleep_for which may sleep for 
longer than
-// the specified duration due to scheduling or resource contention delays.
-// For details, see https://en.cppreference.com/w/cpp/thread/sleep_for.
-//
-// Example usage:
-//
-//   #include <chrono>
-//   #include <condition_variable>
-//   #include <memory>
-//   #include <mutex>
-//
-//   #include "common/status.h"
-//
-//   std::condition_variable cv;
-//   std::mutex mu;
-//   std::shared_ptr<bool> wakeup_guard = make_shared<bool>();
-//   Ticker<std::chrono::seconds, bool> ticker(std::chrono::seconds(30), cv, 
mu,
-//       wakeup_guard, true);
-//
-//   ABORT_IF_ERROR(ticker.Start());
-//
-//   while(true) {
-//     unique_lock<mutex> l(mu);
-//     cv.wait(l, ticker.WakeupGuard());
-//     *wakeup_guard = false;
-//
-//     run_my_code();
-//   }
-
-template <typename DurationType, typename IndicatorType>
-class Ticker {
-  public:
-    Ticker(DurationType interval, std::condition_variable& cv,
-        std::mutex& lock, std::shared_ptr<IndicatorType> indicator,
-        IndicatorType wakeup_value) : interval_(interval), cv_(cv), 
lock_(lock),
-        indicator_(indicator), wakeup_value_(wakeup_value) {}
-
-    Status Start(const std::string& category, const std::string& name) {
-      return Thread::Create(category, name, &Ticker::run, this, &my_thread_);
-    }
-
-    // Specify that the next iteration of this ticker be the last. This 
function does not
-    // block nor does it cause the ticker to wake up earlier than scheduled.
-    void RequestStop() {
-      stop_requested_.Store(true);
-    }
-
-    // Wait for the ticker to exit after it's final iteration.
-    void Join() {
-      my_thread_->Join();
-    }
-
-    // Provides a default implementation for the condition variable predicate 
lambda.
-    std::function<bool()> WakeupGuard() {
-      return [this]{ return *indicator_ == wakeup_value_; };
-    }
-
-  protected:
-    const DurationType interval_;
-    std::condition_variable& cv_;
-    std::mutex& lock_;
-    std::shared_ptr<IndicatorType> indicator_;
-    const IndicatorType wakeup_value_;
-
-  private:
-    std::unique_ptr<Thread> my_thread_;
-    AtomicBool stop_requested_;
-
-    void run() {
-      while (!stop_requested_.Load()) {
-        std::this_thread::sleep_for(interval_);
-
-        {
-          std::lock_guard<std::mutex> l(lock_);
-          *indicator_ = wakeup_value_;
-        }
-
-        cv_.notify_all();
-      }
-    }
-}; // class Ticker
-
-// Specialization of the Ticker class that uses seconds for the duration and 
bool as the
-// wakeup indicator. The boolean shared_ptr indicator is internally managed. 
Use the
-// ResetWakeupGuard() function in your code immediately after the condition 
variable wait
-// to set the internally managed wakeup guard for the next iteration.
-class TickerSecondsBool : public Ticker<std::chrono::seconds, bool> {
-  public:
-    TickerSecondsBool(uint32_t interval, std::condition_variable& cv,
-      std::mutex& lock) :
-      Ticker(std::chrono::seconds(interval), cv, lock, 
std::make_shared<bool>(), true) {}
-
-    void ResetWakeupGuard() {
-      *indicator_ = false;
-    }
-}; // class TickerSecondsBool
-
-} // namespace impala

Reply via email to