This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 7fabd27096dc8af9ee2e58f39673defb51927b05 Author: Riza Suminto <riza.sumi...@cloudera.com> AuthorDate: Mon Sep 8 14:24:02 2025 -0700 IMPALA-14411: enable_workload_mgmt should work with V2 profile Impalad crash (hitting DCHECK) when both enable_workload_mgmt and gen_experimental_profile enabled. This is because lambda function process_exec_profile expect "Averaged Fragment" node exist in query profile. But it is actually not exist in V2 query profile. This patch fix the issue by gathering ScratchBytesWritten, ScannerIoWaitTime, and DataCacheHitBytes counters differently in V2 profile. Testing: - Add TestQueryLogTableHS2::test_with_experimental_profile. - Manually start minicluster with both enable_workload_mgmt and gen_experimental_profile flag enabled. Run few queries and confirm no crash happen. Also verify that the columns of sys.impala_query_log that summarize the scan node counters are correct. Change-Id: Iccb4ad9279b0d66479b1e7816ffc732028e71734 Reviewed-on: http://gerrit.cloudera.org:8080/23396 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/service/query-state-record.cc | 67 ++++++++++++++++++++++++---------- tests/custom_cluster/test_query_log.py | 36 ++++++++++++++++++ 2 files changed, 84 insertions(+), 19 deletions(-) diff --git a/be/src/service/query-state-record.cc b/be/src/service/query-state-record.cc index 1b91ed41d..626128485 100644 --- a/be/src/service/query-state-record.cc +++ b/be/src/service/query-state-record.cc @@ -41,6 +41,8 @@ using namespace std; +DECLARE_bool(gen_experimental_profile); + namespace impala { QueryStateRecord::QueryStateRecord( @@ -281,9 +283,10 @@ QueryStateExpanded::QueryStateExpanded( // Query Profile is initialized when query execution starts. Thus, it will always be // non-null at this point since this code runs after the query completes. DCHECK(coord->query_profile() != nullptr); - map<string, int64_t> host_scratch_bytes; - map<string, int64_t> scanner_io_wait; - map<string, int64_t> bytes_read_cache; + // following maps are <instance_name, pair<counter_value, num_input_profile>>. + map<string, pair<int64_t, int32_t>> host_scratch_bytes; + map<string, pair<int64_t, int32_t>> scanner_io_wait; + map<string, pair<int64_t, int32_t>> bytes_read_cache; vector<RuntimeProfileBase*> prof_stack; // Lambda function to recursively walk through a profile. @@ -291,38 +294,62 @@ QueryStateExpanded::QueryStateExpanded( &process_exec_profile, &host_scratch_bytes, &scanner_io_wait, &prof_stack, &bytes_read_cache, this](RuntimeProfileBase* profile) { prof_stack.push_back(profile); + bool is_aggregated = FLAGS_gen_experimental_profile; + + // 'num_input_profile' is 1 for non-aggregated profiles. + // Note that in V1 profile, "Averaged Fragment" node is an aggregated profile. + int num_input_profile = profile->GetNumInputProfiles(); + if (const auto& cntr = profile->GetCounter("ScratchBytesWritten"); cntr != nullptr) { - host_scratch_bytes.emplace(profile->name(), cntr->value()); + host_scratch_bytes.emplace( + profile->name(), make_pair(cntr->value(), num_input_profile)); } // Metrics from HDFS_SCAN_NODE entries. if (const string& scan = prof_stack.back()->name(); boost::algorithm::istarts_with(scan, "HDFS_SCAN_NODE")) { - // Find a parent instance. If none found, assume in Averaged Fragment. - if (auto it = find_if(prof_stack.begin()+1, prof_stack.end()-1, find_instance); - it != prof_stack.end()-1) { - DCHECK(find_if(prof_stack.begin(), prof_stack.end(), find_averaged) - == prof_stack.end()); - const string& inst = (*it)->name(); + if (is_aggregated) { + // profile is a V2 AggregatedRuntimeProfile. if (const auto& cntr = profile->GetCounter("ScannerIoWaitTime"); cntr != nullptr) { - scanner_io_wait.emplace(StrCat(inst, "::", scan), cntr->value()); + scanner_io_wait.emplace(scan, make_pair(cntr->value(), num_input_profile)); } if (const auto& cntr = profile->GetCounter("DataCacheHitBytes"); cntr != nullptr) { - bytes_read_cache.emplace(StrCat(inst, "::", scan), cntr->value()); + bytes_read_cache.emplace(scan, make_pair(cntr->value(), num_input_profile)); } } else { - DCHECK(find_if(prof_stack.begin(), prof_stack.end(), find_averaged) - != prof_stack.end()); + // profile is a V1 RuntimeProfile. + // Find a parent instance name. If none found, assume in Averaged Fragment. + if (auto it = + find_if(prof_stack.begin() + 1, prof_stack.end() - 1, find_instance); + it != prof_stack.end() - 1) { + DCHECK(find_if(prof_stack.begin(), prof_stack.end(), find_averaged) + == prof_stack.end()); + const string& inst = (*it)->name(); + if (const auto& cntr = profile->GetCounter("ScannerIoWaitTime"); + cntr != nullptr) { + scanner_io_wait.emplace( + StrCat(inst, "::", scan), make_pair(cntr->value(), num_input_profile)); + } + + if (const auto& cntr = profile->GetCounter("DataCacheHitBytes"); + cntr != nullptr) { + bytes_read_cache.emplace( + StrCat(inst, "::", scan), make_pair(cntr->value(), num_input_profile)); + } + } else { + DCHECK(find_if(prof_stack.begin(), prof_stack.end(), find_averaged) + != prof_stack.end()); + } } } // Total Bytes Read if (const auto& cntr = profile->GetCounter("TotalBytesRead"); cntr != nullptr) { - bytes_read_total = cntr->value(); + bytes_read_total = cntr->value() * num_input_profile; } // Recursively walk down through all child nodes. @@ -338,21 +365,23 @@ QueryStateExpanded::QueryStateExpanded( // Compressed Bytes Spilled for (const auto& hsb : host_scratch_bytes) { - compressed_bytes_spilled += hsb.second; + compressed_bytes_spilled += (hsb.second.first * hsb.second.second); } // Read IO Wait Time Total and Average if (scanner_io_wait.size() > 0) { + int32_t total_profiles = 0; for (const auto& item : scanner_io_wait) { - read_io_wait_time_total += item.second; + read_io_wait_time_total += (item.second.first * item.second.second); + total_profiles += item.second.second; } - read_io_wait_time_mean = read_io_wait_time_total / scanner_io_wait.size(); + read_io_wait_time_mean = read_io_wait_time_total / total_profiles; } // Bytes Read from Data Cache for (const auto& b : bytes_read_cache) { - bytes_read_cache_total += b.second; + bytes_read_cache_total += (b.second.first * b.second.second); } // Per-Node Peak Memory Usage diff --git a/tests/custom_cluster/test_query_log.py b/tests/custom_cluster/test_query_log.py index 44a3454db..2ca66af87 100644 --- a/tests/custom_cluster/test_query_log.py +++ b/tests/custom_cluster/test_query_log.py @@ -605,6 +605,42 @@ class TestQueryLogTableHS2(WorkloadManagementTestSuite): "limit of 1s000ms".format(self.insert_query_id), expected_count=2) + @CustomClusterTestSuite.with_args( + cluster_size=1, log_symlinks=True, + impalad_args="--gen_experimental_profile", + workload_mgmt=True, + disable_log_buffering=True) + def test_with_experimental_profile(self): + """Test that impalad does not crash when gen_experimental_profile flag is True.""" + impalad = self.cluster.get_first_impalad().service + + # Run a query which should successfully be written to the query log table. + client = self.hs2_client + query = "select count(*) from functional.alltypes" + handle = client.execute_async(query) + query_id = client.handle_id(handle) + client.fetch(query, handle, discard_results=True) + client.close_query(handle) + + # Wait for the query to be written to the sys.impala_query_log table. + impalad.wait_for_metric_value("impala-server.completed-queries.written", 1, 15) + + # Validate that following columns are non-zero. + # BYTES_READ_CACHE_TOTAL can be 0 if the data was not cached. + # V2 profile counters only print stats (mean, min, max), so direct validation + # against query profile is not feasible yet. + res = client.execute( + "select READ_IO_WAIT_TOTAL_MS, READ_IO_WAIT_MEAN_MS, " + " BYTES_READ_TOTAL, BYTES_READ_CACHE_TOTAL " + "from sys.impala_query_log where query_id = '{}'".format(query_id)) + assert res.success + rows = res.tuples() + assert len(rows) == 1 + assert float(rows[0][0]) > 0, "Expected READ_IO_WAIT_TOTAL_MS > 0" + assert float(rows[0][1]) > 0, "Expected READ_IO_WAIT_MEAN_MS > 0" + assert float(rows[0][2]) > 0, "Expected BYTES_READ_TOTAL > 0" + assert float(rows[0][3]) >= 0, "Expected BYTES_READ_CACHE_TOTAL >= 0" + class TestQueryLogTableAll(WorkloadManagementTestSuite): """Tests to assert the query log table is correctly populated when using all the