This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 32aeeecc0 IMPALA-12008: Fix incorrect end time in DML profiles
32aeeecc0 is described below

commit 32aeeecc078015263a1282f631acde4df6bd789c
Author: stiga-huang <[email protected]>
AuthorDate: Wed Mar 22 18:43:53 2023 +0800

    IMPALA-12008: Fix incorrect end time in DML profiles
    
    The end time in DML profiles is incorrect that it's actually the time
    when admission control resources are released. This is correct for
    normal queries. But for DMLs, coordinator still needs to invoke the
    updateCatalog RPC of catalogd to finalize the HMS update. The end time
    should be set after the request finished.
    
    This patch fixes the DML end time by not setting it after the admission
    control resources are released. Instead, it's set after
    ClientRequestState::WaitInternal() finishes, which makes sure the
    updateCatalog RPC has finished.
    
    Also adds a duration field in profile by the way.
    
    For testing, this patch also adds a new debug action in catalogd
    (catalogd_insert_finish_delay) to inject delays in updateCatalog.
    
    Tests
     - Added e2e test to verify the end time of a DML profile
    
    Change-Id: I9c5dc92c2f8576ceed374d447c0ac05022a2dee6
    Reviewed-on: http://gerrit.cloudera.org:8080/19644
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/coordinator.cc                      |  7 ++--
 be/src/service/client-request-state.cc             |  8 +++++
 common/thrift/CatalogService.thrift                |  3 ++
 .../apache/impala/service/CatalogOpExecutor.java   |  5 +++
 .../java/org/apache/impala/util/DebugUtils.java    |  4 +++
 tests/query_test/test_observability.py             | 40 ++++++++++++++++++++--
 tests/util/parse_util.py                           |  8 +++++
 7 files changed, 71 insertions(+), 4 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index aa9e00106..df9257c98 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -813,7 +813,10 @@ void Coordinator::HandleExecStateTransition(
   }
   ReleaseQueryAdmissionControlResources();
   // Once the query has released its admission control resources, update its 
end time.
-  parent_request_state_->UpdateEndTime();
+  // However, for non Query statement like DML statement, we still need to 
update HMS
+  // after the query finishes. So the end time of non Query statement is not 
set here.
+  // Instead, we set it in ClientRequestState::Wait().
+  if (stmt_type_ == TStmtType::QUERY) parent_request_state_->UpdateEndTime();
   // Can compute summary only after we stop accepting reports from the 
backends. Both
   // WaitForBackends() and CancelBackends() ensures that.
   // TODO: should move this off of the query execution path?
@@ -947,7 +950,7 @@ Status Coordinator::Wait() {
     RETURN_IF_ERROR(UpdateExecState(FinalizeResultSink(), nullptr, 
FLAGS_hostname));
   }
 
-  // DML requests are finished at this point.
+  // DML queries are finished at this point.
   RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
   query_profile_->AddInfoString(
       "DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index f863d86a7..77d0788ee 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -154,6 +154,7 @@ ClientRequestState::ClientRequestState(const TQueryCtx& 
query_ctx, Frontend* fro
   summary_profile_->AddInfoString("Start Time", 
ToStringFromUnixMicros(start_time_us(),
       TimePrecision::Nanosecond));
   summary_profile_->AddInfoString("End Time", "");
+  summary_profile_->AddInfoString("Duration", "");
   summary_profile_->AddInfoString("Query Type", "N/A");
   summary_profile_->AddInfoString("Query State", 
PrintValue(BeeswaxQueryState()));
   summary_profile_->AddInfoString(
@@ -1134,6 +1135,7 @@ void ClientRequestState::Wait() {
       query_events()->MarkEvent("Rows available");
     } else {
       query_events()->MarkEvent("Request finished");
+      UpdateEndTime();
     }
     discard_result(UpdateQueryStatus(status));
   }
@@ -1513,6 +1515,9 @@ Status ClientRequestState::UpdateCatalog() {
     TUpdateCatalogRequest catalog_update;
     catalog_update.__set_sync_ddl(exec_request_->query_options.sync_ddl);
     catalog_update.__set_header(GetCatalogServiceRequestHeader());
+    if (exec_request_->query_options.__isset.debug_action) {
+      
catalog_update.__set_debug_action(exec_request_->query_options.debug_action);
+    }
     DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state();
     if (!dml_exec_state->PrepareCatalogUpdate(&catalog_update)) {
       VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
@@ -1840,6 +1845,9 @@ void ClientRequestState::UpdateEndTime() {
     // of nanosecond precision, so we explicitly specify the precision here.
     summary_profile_->AddInfoString(
         "End Time", ToStringFromUnixMicros(end_time_us(), 
TimePrecision::Nanosecond));
+    int64_t duration = end_time_us() - start_time_us();
+    summary_profile_->AddInfoString("Duration", Substitute("$0 ($1 us)",
+        PrettyPrinter::Print(duration, TUnit::TIME_US), duration));
   }
 }
 
diff --git a/common/thrift/CatalogService.thrift 
b/common/thrift/CatalogService.thrift
index b8ff63c88..6606564f6 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -258,6 +258,9 @@ struct TUpdateCatalogRequest {
 
   // Descriptor object about the Iceberg operation.
   10: optional TIcebergOperationParam iceberg_operation
+
+  // Passes the debug actions to catalogd if the query option is set.
+  11: optional string debug_action
 }
 
 // Response from a TUpdateCatalogRequest
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index e2030b251..c62646e71 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -6832,6 +6832,11 @@ public class CatalogOpExecutor {
       response.getResult().setVersion(
           catalog_.waitForSyncDdlVersion(response.getResult()));
     }
+
+    if (update.isSetDebug_action()) {
+      DebugUtils.executeDebugAction(update.getDebug_action(),
+          DebugUtils.INSERT_FINISH_DELAY);
+    }
     return response;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index f1587f7a4..5d2b353da 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -50,6 +50,10 @@ public class DebugUtils {
   // debug action label for introducing delay in update stats command.
   public static final String UPDATE_STATS_DELAY = 
"catalogd_update_stats_delay";
 
+  // debug action label for introducing delay when the catalog operation of 
INSERT, i.e.
+  // CatalogOpExecutor#updateCatalog() finishes.
+  public static final String INSERT_FINISH_DELAY = 
"catalogd_insert_finish_delay";
+
   /**
    * Given list of debug actions, execute the debug action pertaining to the 
given label.
    * The debugActions string is of the format specified for the 
query_option/configuration
diff --git a/tests/query_test/test_observability.py 
b/tests/query_test/test_observability.py
index a7f42058f..7d29cd428 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -22,7 +22,8 @@ from tests.beeswax.impala_beeswax import 
ImpalaBeeswaxException
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfFS, SkipIfLocal, SkipIfNotHdfsMinicluster
 from tests.util.filesystem_utils import IS_EC, WAREHOUSE
-from time import sleep
+from tests.util.parse_util import get_duration_us_from_str
+from time import sleep, time
 from RuntimeProfile.ttypes import TRuntimeProfileFormat
 import pytest
 import re
@@ -608,7 +609,7 @@ class TestObservability(ImpalaTestSuite):
     assert len(start_time_sub_sec_str) == 9, start_time
 
   @pytest.mark.execute_serially
-  def test_end_time(self):
+  def test_query_end_time(self):
     """ Test that verifies that the end time of a query with a coordinator is 
set once
     the coordinator releases its admission control resources. This ensures 
that the
     duration of the query will be determined by the time taken to do real work 
rather
@@ -643,6 +644,41 @@ class TestObservability(ImpalaTestSuite):
     end_time = tree.nodes[1].info_strings["End Time"]
     assert end_time is not None
 
+  def test_dml_end_time(self, unique_database):
+    """Same as the above test but verifies the end time of DML is set only 
after the work
+    in catalogd is done."""
+    stmt = "create table %s.alltypestiny like functional.alltypestiny" % 
unique_database
+    self.execute_query(stmt)
+    # Warm up the table
+    self.execute_query("describe %s.alltypestiny" % unique_database)
+    stmt = "insert overwrite %s.alltypestiny partition(year, month)" \
+           " select * from functional.alltypestiny" % unique_database
+    # Use debug_action to inject a delay in catalogd. The INSERT usually 
finishes in
+    # 300ms without the delay.
+    delay_s = 5
+    self.hs2_client.set_configuration_option(
+        "debug_action", "catalogd_insert_finish_delay:SLEEP@%d" % (delay_s * 
1000))
+    start_ts = time()
+    handle = self.hs2_client.execute_async(stmt)
+    self.hs2_client.clear_configuration()
+    end_time_str = ""
+    duration_str = ""
+    while len(end_time_str) == 0:
+      sleep(1)
+      tree = self.hs2_client.get_runtime_profile(handle, 
TRuntimeProfileFormat.THRIFT)
+      end_time_str = tree.nodes[1].info_strings["End Time"]
+      duration_str = tree.nodes[1].info_strings["Duration"]
+      # End time should not show up earlier than the delay.
+      if time() - start_ts < delay_s:
+        assert len(end_time_str) == 0, "End time show up too early: {end_str}. 
" \
+                                       "{delay_s} second delay expected since 
" \
+                                       "{start_str} ({start_ts:.6f})".format(
+          end_str=end_time_str, delay_s=delay_s, start_ts=start_ts,
+          start_str=datetime.utcfromtimestamp(start_ts).strftime('%Y-%m-%d 
%H:%M:%S.%f'))
+    self.hs2_client.close_query(handle)
+    duration_us = get_duration_us_from_str(duration_str)
+    assert duration_us > delay_s * 1000000
+
   def test_query_profile_contains_number_of_fragment_instance(self):
     """Test that the expected section for number of fragment instance in
     a query profile."""
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 46a995a96..16c6fb220 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -102,6 +102,14 @@ def parse_duration_string_ms(duration):
   return (times['h'] * 60 * 60 + times['m'] * 60 + times['s']) * 1000 + 
times['ms']
 
 
+def get_duration_us_from_str(duration_str):
+  """Parses the duration string got in profile and returns the duration in 
us"""
+  match_res = re.search(r"\((\d+) us\)", duration_str)
+  if match_res:
+    return int(match_res.group(1))
+  raise Exception("Illegal duration string: " + duration_str)
+
+
 def match_memory_estimate(explain_lines):
   """
   Given a list of strings from EXPLAIN output, find the estimated memory 
needed. This is

Reply via email to