This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d95c06cd6c0502b13f9333ba601b7cfa0bbe6775
Author: Yida Wu <[email protected]>
AuthorDate: Tue Apr 29 12:34:50 2025 -0700

    IMPALA-14001: Start EXEC_TIME_LIMIT_S timer after backend execution begins
    
    This patch fixes an issue where EXEC_TIME_LIMIT_S was inaccurately
    enforced by including the planning time in its countdown. The timer
    for EXEC_TIME_LIMIT_S is now started only after the coordinator
    reaches the "Ready to start on the backends" state, ensuring that
    this time limit applies strictly to the execution phase.
    
    This patch also adds a DebugAction PLAN_CREATE in the planning phase
    for the testing purpose.
    
    Tests:
    Passed core tests.
    Adds an ee testcase query_test/test_exec_time_limit.py.
    
    Change-Id: I825e867f1c9a39a9097d1c97ee8215281a009d7d
    Reviewed-on: http://gerrit.cloudera.org:8080/22837
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/coordinator.cc                      |  1 +
 be/src/runtime/query-driver.cc                     |  4 ++
 be/src/runtime/query-driver.h                      |  3 +
 be/src/service/impala-server.cc                    | 23 +++++---
 be/src/service/impala-server.h                     |  4 ++
 .../java/org/apache/impala/service/Frontend.java   |  3 +
 .../java/org/apache/impala/util/DebugUtils.java    |  3 +
 tests/query_test/test_exec_time_limit.py           | 67 ++++++++++++++++++++++
 8 files changed, 99 insertions(+), 9 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4014ea2bf..ee403164b 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -531,6 +531,7 @@ Status Coordinator::StartBackendExec() {
   VLOG_QUERY << "starting execution on " << num_backends << " backends for 
query_id="
              << PrintId(query_id());
   query_events_->MarkEvent(Substitute("Ready to start on $0 backends", 
num_backends));
+  parent_query_driver_->SetExecTimeLimit(parent_request_state_);
 
   // Serialize the TQueryCtx once and pass it to each backend. The serialized 
buffer must
   // stay valid until WaitOnExecRpcs() has returned.
diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 93199cedd..27880fecc 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -264,6 +264,10 @@ void QueryDriver::TryQueryRetry(
   }
 }
 
+void QueryDriver::SetExecTimeLimit(const ClientRequestState* request_state) {
+  parent_server_->SetExecTimeLimit(request_state);
+}
+
 void QueryDriver::RetryQueryFromThread(
     const Status& error, const shared_ptr<QueryDriver>& query_driver) {
   // This method does not require holding the ClientRequestState::lock_ for 
the original
diff --git a/be/src/runtime/query-driver.h b/be/src/runtime/query-driver.h
index 5f46d53e3..6219c885f 100644
--- a/be/src/runtime/query-driver.h
+++ b/be/src/runtime/query-driver.h
@@ -176,6 +176,9 @@ class QueryDriver {
   void TryQueryRetry(ClientRequestState* client_request_state, Status* error,
       bool* was_retried = nullptr);
 
+  /// Sets the execution time limit based on the query option.
+  void SetExecTimeLimit(const ClientRequestState* request_state);
+
   /// Finalize this QueryDriver. Must be called before Unregister(...) is 
called.
   /// This indicates that the query should no longer be considered registered 
from the
   /// client's point of view. Returns an INVALID_QUERY_HANDLE error if 
finalization
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 3364dc54c..4e2e46148 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1527,13 +1527,12 @@ Status ImpalaServer::SetQueryInflight(
 
   // If the query has a timeout or time limit, schedule checks.
   int32_t idle_timeout_s = GetIdleTimeout(query_handle->query_options());
-  int32_t exec_time_limit_s = query_handle->query_options().exec_time_limit_s;
   int64_t cpu_limit_s = query_handle->query_options().cpu_limit_s;
   int64_t scan_bytes_limit = query_handle->query_options().scan_bytes_limit;
   int64_t join_rows_produced_limit =
       query_handle->query_options().join_rows_produced_limit;
-  if (idle_timeout_s > 0 || exec_time_limit_s > 0 || cpu_limit_s > 0
-      || scan_bytes_limit > 0 || join_rows_produced_limit > 0) {
+  if (idle_timeout_s > 0 || cpu_limit_s > 0 || scan_bytes_limit > 0
+      || join_rows_produced_limit > 0) {
     DebugActionNoFail(query_handle->query_options(), 
"SET_QUERY_INFLIGHT_EXPIRATION");
     lock_guard<mutex> l2(query_expiration_lock_);
     int64_t now = UnixMillis();
@@ -1543,12 +1542,6 @@ Status ImpalaServer::SetQueryInflight(
       queries_by_timestamp_.emplace(ExpirationEvent{
           now + (1000L * idle_timeout_s), query_id, 
ExpirationKind::IDLE_TIMEOUT});
     }
-    if (exec_time_limit_s > 0) {
-      VLOG_QUERY << "Query " << PrintId(query_id) << " has execution time 
limit of "
-                 << PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S);
-      queries_by_timestamp_.emplace(ExpirationEvent{
-          now + (1000L * exec_time_limit_s), query_id, 
ExpirationKind::EXEC_TIME_LIMIT});
-    }
     if (cpu_limit_s > 0 || scan_bytes_limit > 0 || join_rows_produced_limit > 
0) {
       if (cpu_limit_s > 0) {
         VLOG_QUERY << "Query " << PrintId(query_id) << " has CPU limit of "
@@ -1569,6 +1562,18 @@ Status ImpalaServer::SetQueryInflight(
   return Status::OK();
 }
 
+void ImpalaServer::SetExecTimeLimit(const ClientRequestState* request_state) {
+  int32_t exec_time_limit_s = request_state->query_options().exec_time_limit_s;
+  if (exec_time_limit_s <= 0) return;
+  const TUniqueId& query_id = request_state->query_id();
+  int64_t now = UnixMillis();
+  VLOG_QUERY << "Query " << PrintId(query_id) << " starts execution with time 
limit of "
+             << PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S);
+  lock_guard<mutex> l(query_expiration_lock_);
+  queries_by_timestamp_.emplace(ExpirationEvent{
+      now + (1000L * exec_time_limit_s), query_id, 
ExpirationKind::EXEC_TIME_LIMIT});
+}
+
 void ImpalaServer::UpdateExecSummary(const QueryHandle& query_handle) const {
   DCHECK(query_handle->GetCoordinator() != nullptr);
   TExecSummary t_exec_summary;
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 8ec45fbb0..b404d7cae 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -931,6 +931,10 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Collect ExecSummary and update it to the profile in request_state
   void UpdateExecSummary(const QueryHandle& query_handle) const;
 
+  /// Sets the execution time limit based on the query option.
+  /// Note: This function may acquire the query_expiration_lock_.
+  void SetExecTimeLimit(const ClientRequestState* request_state);
+
   /// Initialize "default_configs_" to show the default values for 
ImpalaQueryOptions and
   /// "support_start_over/false" to indicate that Impala does not support 
start over
   /// in the fetch call.
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 55315f303..17ab88f38 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2057,6 +2057,9 @@ public class Frontend {
       // can handle various planner fallback execution logic (e.g. allowing one
       // planner, if execution fails, to call a different planner)
       TExecRequest result = getTExecRequestWithFallback(planCtx, timeline);
+      DebugUtils.executeDebugAction(
+          
planCtx.getQueryContext().client_request.query_options.getDebug_action(),
+          DebugUtils.PLAN_CREATE);
       timeline.markEvent("Planning finished");
       result.setTimeline(timeline.toThrift());
       result.setProfile(FrontendProfile.getCurrent().emitAsThrift());
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index d18b3db17..17b82b9f9 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -116,6 +116,9 @@ public class DebugUtils {
   public static final String COLLECT_CATALOG_RESULTS_DELAY =
       "collect_catalog_results_delay";
 
+  // debug action label for plan creation.
+  public static final String PLAN_CREATE = "plan_create";
+
   /**
    * Returns true if the label of action is set in the debugActions
    */
diff --git a/tests/query_test/test_exec_time_limit.py 
b/tests/query_test/test_exec_time_limit.py
new file mode 100644
index 000000000..1806db054
--- /dev/null
+++ b/tests/query_test/test_exec_time_limit.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+class TestQueryExecTimeLimit(ImpalaTestSuite):
+  """Tests the exec_time_limit_s query option."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'tpch'
+
+  def test_exec_time_limit_enforced(self):
+    """Test that queries exceeding exec_time_limit_s are properly cancelled."""
+    exec_options = dict()
+    exec_options['exec_time_limit_s'] = "1"
+    query = "SELECT COUNT(*) FROM tpch.lineitem L1, tpch.lineitem L2"
+    try:
+      self.execute_query(query, exec_options)
+      assert False, "Query was expected to time out but succeeded."
+    except Exception as e:
+      assert "expired due to execution time limit" in str(e), (
+        "Unexpected exception: {}".format(e)
+      )
+
+  def test_exec_time_limit_long_plan(self):
+    """Test that queries with a long planning time completing within
+    exec_time_limit_s succeed."""
+    exec_options = dict()
+    exec_options['exec_time_limit_s'] = "2"
+    # Set debug action to wait in the plan phase for 10s.
+    exec_options['debug_action'] = "plan_create:SLEEP@10000"
+    query = "SELECT * FROM tpch.lineitem limit 1"
+    result = self.execute_query(query, exec_options)
+    assert result.success, "Query failed unexpectedly within 
exec_time_limit_s."
+
+  def test_exec_time_limit_not_exceeded(self):
+    """Test that queries completing within exec_time_limit_s succeed."""
+    exec_options = dict()
+    exec_options['exec_time_limit_s'] = "60"
+    query = "SELECT COUNT(*) FROM tpch.lineitem"
+    result = self.execute_query(query, exec_options)
+    assert result.success, "Query failed unexpectedly within 
exec_time_limit_s."
+
+  def test_exec_time_limit_zero(self):
+    """Test that setting exec_time_limit_s to 0 disables the limit."""
+    exec_options = dict()
+    exec_options['exec_time_limit_s'] = "0"
+    query = "SELECT COUNT(*) FROM tpch.lineitem"
+    result = self.execute_query(query, exec_options)
+    assert result.success, "Query with exec_time_limit_s=0 failed 
unexpectedly."

Reply via email to