This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 32aeeecc0 IMPALA-12008: Fix incorrect end time in DML profiles
32aeeecc0 is described below
commit 32aeeecc078015263a1282f631acde4df6bd789c
Author: stiga-huang <[email protected]>
AuthorDate: Wed Mar 22 18:43:53 2023 +0800
IMPALA-12008: Fix incorrect end time in DML profiles
The end time in DML profiles is incorrect that it's actually the time
when admission control resources are released. This is correct for
normal queries. But for DMLs, coordinator still needs to invoke the
updateCatalog RPC of catalogd to finalize the HMS update. The end time
should be set after the request finished.
This patch fixes the DML end time by not setting it after the admission
control resources are released. Instead, it's set after
ClientRequestState::WaitInternal() finishes, which makes sure the
updateCatalog RPC has finished.
Also adds a duration field in profile by the way.
For testing, this patch also adds a new debug action in catalogd
(catalogd_insert_finish_delay) to inject delays in updateCatalog.
Tests
- Added e2e test to verify the end time of a DML profile
Change-Id: I9c5dc92c2f8576ceed374d447c0ac05022a2dee6
Reviewed-on: http://gerrit.cloudera.org:8080/19644
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/coordinator.cc | 7 ++--
be/src/service/client-request-state.cc | 8 +++++
common/thrift/CatalogService.thrift | 3 ++
.../apache/impala/service/CatalogOpExecutor.java | 5 +++
.../java/org/apache/impala/util/DebugUtils.java | 4 +++
tests/query_test/test_observability.py | 40 ++++++++++++++++++++--
tests/util/parse_util.py | 8 +++++
7 files changed, 71 insertions(+), 4 deletions(-)
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index aa9e00106..df9257c98 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -813,7 +813,10 @@ void Coordinator::HandleExecStateTransition(
}
ReleaseQueryAdmissionControlResources();
// Once the query has released its admission control resources, update its
end time.
- parent_request_state_->UpdateEndTime();
+ // However, for non Query statement like DML statement, we still need to
update HMS
+ // after the query finishes. So the end time of non Query statement is not
set here.
+ // Instead, we set it in ClientRequestState::Wait().
+ if (stmt_type_ == TStmtType::QUERY) parent_request_state_->UpdateEndTime();
// Can compute summary only after we stop accepting reports from the
backends. Both
// WaitForBackends() and CancelBackends() ensures that.
// TODO: should move this off of the query execution path?
@@ -947,7 +950,7 @@ Status Coordinator::Wait() {
RETURN_IF_ERROR(UpdateExecState(FinalizeResultSink(), nullptr,
FLAGS_hostname));
}
- // DML requests are finished at this point.
+ // DML queries are finished at this point.
RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
query_profile_->AddInfoString(
"DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
diff --git a/be/src/service/client-request-state.cc
b/be/src/service/client-request-state.cc
index f863d86a7..77d0788ee 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -154,6 +154,7 @@ ClientRequestState::ClientRequestState(const TQueryCtx&
query_ctx, Frontend* fro
summary_profile_->AddInfoString("Start Time",
ToStringFromUnixMicros(start_time_us(),
TimePrecision::Nanosecond));
summary_profile_->AddInfoString("End Time", "");
+ summary_profile_->AddInfoString("Duration", "");
summary_profile_->AddInfoString("Query Type", "N/A");
summary_profile_->AddInfoString("Query State",
PrintValue(BeeswaxQueryState()));
summary_profile_->AddInfoString(
@@ -1134,6 +1135,7 @@ void ClientRequestState::Wait() {
query_events()->MarkEvent("Rows available");
} else {
query_events()->MarkEvent("Request finished");
+ UpdateEndTime();
}
discard_result(UpdateQueryStatus(status));
}
@@ -1513,6 +1515,9 @@ Status ClientRequestState::UpdateCatalog() {
TUpdateCatalogRequest catalog_update;
catalog_update.__set_sync_ddl(exec_request_->query_options.sync_ddl);
catalog_update.__set_header(GetCatalogServiceRequestHeader());
+ if (exec_request_->query_options.__isset.debug_action) {
+
catalog_update.__set_debug_action(exec_request_->query_options.debug_action);
+ }
DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state();
if (!dml_exec_state->PrepareCatalogUpdate(&catalog_update)) {
VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
@@ -1840,6 +1845,9 @@ void ClientRequestState::UpdateEndTime() {
// of nanosecond precision, so we explicitly specify the precision here.
summary_profile_->AddInfoString(
"End Time", ToStringFromUnixMicros(end_time_us(),
TimePrecision::Nanosecond));
+ int64_t duration = end_time_us() - start_time_us();
+ summary_profile_->AddInfoString("Duration", Substitute("$0 ($1 us)",
+ PrettyPrinter::Print(duration, TUnit::TIME_US), duration));
}
}
diff --git a/common/thrift/CatalogService.thrift
b/common/thrift/CatalogService.thrift
index b8ff63c88..6606564f6 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -258,6 +258,9 @@ struct TUpdateCatalogRequest {
// Descriptor object about the Iceberg operation.
10: optional TIcebergOperationParam iceberg_operation
+
+ // Passes the debug actions to catalogd if the query option is set.
+ 11: optional string debug_action
}
// Response from a TUpdateCatalogRequest
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index e2030b251..c62646e71 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -6832,6 +6832,11 @@ public class CatalogOpExecutor {
response.getResult().setVersion(
catalog_.waitForSyncDdlVersion(response.getResult()));
}
+
+ if (update.isSetDebug_action()) {
+ DebugUtils.executeDebugAction(update.getDebug_action(),
+ DebugUtils.INSERT_FINISH_DELAY);
+ }
return response;
}
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index f1587f7a4..5d2b353da 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -50,6 +50,10 @@ public class DebugUtils {
// debug action label for introducing delay in update stats command.
public static final String UPDATE_STATS_DELAY =
"catalogd_update_stats_delay";
+ // debug action label for introducing delay when the catalog operation of
INSERT, i.e.
+ // CatalogOpExecutor#updateCatalog() finishes.
+ public static final String INSERT_FINISH_DELAY =
"catalogd_insert_finish_delay";
+
/**
* Given list of debug actions, execute the debug action pertaining to the
given label.
* The debugActions string is of the format specified for the
query_option/configuration
diff --git a/tests/query_test/test_observability.py
b/tests/query_test/test_observability.py
index a7f42058f..7d29cd428 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -22,7 +22,8 @@ from tests.beeswax.impala_beeswax import
ImpalaBeeswaxException
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfFS, SkipIfLocal, SkipIfNotHdfsMinicluster
from tests.util.filesystem_utils import IS_EC, WAREHOUSE
-from time import sleep
+from tests.util.parse_util import get_duration_us_from_str
+from time import sleep, time
from RuntimeProfile.ttypes import TRuntimeProfileFormat
import pytest
import re
@@ -608,7 +609,7 @@ class TestObservability(ImpalaTestSuite):
assert len(start_time_sub_sec_str) == 9, start_time
@pytest.mark.execute_serially
- def test_end_time(self):
+ def test_query_end_time(self):
""" Test that verifies that the end time of a query with a coordinator is
set once
the coordinator releases its admission control resources. This ensures
that the
duration of the query will be determined by the time taken to do real work
rather
@@ -643,6 +644,41 @@ class TestObservability(ImpalaTestSuite):
end_time = tree.nodes[1].info_strings["End Time"]
assert end_time is not None
+ def test_dml_end_time(self, unique_database):
+ """Same as the above test but verifies the end time of DML is set only
after the work
+ in catalogd is done."""
+ stmt = "create table %s.alltypestiny like functional.alltypestiny" %
unique_database
+ self.execute_query(stmt)
+ # Warm up the table
+ self.execute_query("describe %s.alltypestiny" % unique_database)
+ stmt = "insert overwrite %s.alltypestiny partition(year, month)" \
+ " select * from functional.alltypestiny" % unique_database
+ # Use debug_action to inject a delay in catalogd. The INSERT usually
finishes in
+ # 300ms without the delay.
+ delay_s = 5
+ self.hs2_client.set_configuration_option(
+ "debug_action", "catalogd_insert_finish_delay:SLEEP@%d" % (delay_s *
1000))
+ start_ts = time()
+ handle = self.hs2_client.execute_async(stmt)
+ self.hs2_client.clear_configuration()
+ end_time_str = ""
+ duration_str = ""
+ while len(end_time_str) == 0:
+ sleep(1)
+ tree = self.hs2_client.get_runtime_profile(handle,
TRuntimeProfileFormat.THRIFT)
+ end_time_str = tree.nodes[1].info_strings["End Time"]
+ duration_str = tree.nodes[1].info_strings["Duration"]
+ # End time should not show up earlier than the delay.
+ if time() - start_ts < delay_s:
+ assert len(end_time_str) == 0, "End time show up too early: {end_str}.
" \
+ "{delay_s} second delay expected since
" \
+ "{start_str} ({start_ts:.6f})".format(
+ end_str=end_time_str, delay_s=delay_s, start_ts=start_ts,
+ start_str=datetime.utcfromtimestamp(start_ts).strftime('%Y-%m-%d
%H:%M:%S.%f'))
+ self.hs2_client.close_query(handle)
+ duration_us = get_duration_us_from_str(duration_str)
+ assert duration_us > delay_s * 1000000
+
def test_query_profile_contains_number_of_fragment_instance(self):
"""Test that the expected section for number of fragment instance in
a query profile."""
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 46a995a96..16c6fb220 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -102,6 +102,14 @@ def parse_duration_string_ms(duration):
return (times['h'] * 60 * 60 + times['m'] * 60 + times['s']) * 1000 +
times['ms']
+def get_duration_us_from_str(duration_str):
+ """Parses the duration string got in profile and returns the duration in
us"""
+ match_res = re.search(r"\((\d+) us\)", duration_str)
+ if match_res:
+ return int(match_res.group(1))
+ raise Exception("Illegal duration string: " + duration_str)
+
+
def match_memory_estimate(explain_lines):
"""
Given a list of strings from EXPLAIN output, find the estimated memory
needed. This is