This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 6cec0bfe1 IMPALA-14791: Fix crash in PlanToJson when sink was not
executed
6cec0bfe1 is described below
commit 6cec0bfe179162a0943108cc624ac74ce4b5fbaa
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Fri Feb 27 15:29:24 2026 +0100
IMPALA-14791: Fix crash in PlanToJson when sink was not executed
impala-http-handler.cc::SinkToJsonHelper had the assumption that we
always have a TPlanNodeExecSummary for the sink, but it is not true.
In some cases we don't have plan node exec summaries, e.g.:
* CTAS query failed to create the target table, so execution did not
start
* Admission control rejected query execution
This patch set fixes SinkToJsonHelper to work well in case of missing
summaries.
Testing:
* e2e added with failing CTAS
* custom tests added with failing CTAS and INSERT due to admission
control
Change-Id: I4fe6ed48d365a34380991f544e1ff628e95fa89e
Reviewed-on: http://gerrit.cloudera.org:8080/24050
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/service/impala-http-handler.cc | 8 ++-
tests/common/impala_test_suite.py | 20 +++++++
tests/custom_cluster/test_admission_controller.py | 30 +++++++++++
tests/webserver/test_web_pages.py | 66 +++++++++++------------
4 files changed, 89 insertions(+), 35 deletions(-)
diff --git a/be/src/service/impala-http-handler.cc
b/be/src/service/impala-http-handler.cc
index 849a9c716..0f86585fc 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -1185,8 +1185,12 @@ void SinkToJsonHelper(const TDataSink& sink,
map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary_it =
summaries.find(SINK_ID);
- DCHECK(summary_it != summaries.end());
- ExecStatsToJsonHelper(summary_it->second, document, value);
+ if (summary_it != summaries.end()) {
+ // Sometimes we don't have a summary for the sink, e.g.:
+ // - Query was rejected by admission control
+ // - CTAS query failed to create the target table, so the sink was never
executed
+ ExecStatsToJsonHelper(summary_it->second, document, value);
+ }
Value children(kArrayType);
diff --git a/tests/common/impala_test_suite.py
b/tests/common/impala_test_suite.py
index ab710a387..c7b1d4808 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -615,6 +615,26 @@ class ImpalaTestSuite(BaseTestSuite):
assert response.status_code == requests.codes.ok
return json.loads(response.text)
+ def run_query_and_get_debug_page(self, query, page_url, query_options=None,
+ expected_state=None, timeout=100):
+ """Runs a query to obtain the content of the debug page pointed to by
page_url, then
+ cancels the query. Optionally takes in an expected_state parameter, if
specified the
+ method waits for the query to reach the expected state before getting its
debug
+ information."""
+ with self.create_impala_client(protocol=HS2) as client:
+ if query_options:
+ client.set_configuration(query_options)
+ query_handle = client.execute_async(query)
+ if expected_state:
+ client.wait_for_impala_state(query_handle, expected_state, timeout)
+ query_id = client.handle_id(query_handle)
+ response_json = self.get_debug_page(
+ "{0}?query_id={1}&json".format(page_url, query_id).
+ format(25000))
+ client.cancel(query_handle)
+ client.close_query(query_handle)
+ return (query_id, response_json)
+
def get_var_current_val(self, var):
"""Returns the current value of a given Impalad flag variable."""
# Parse the /varz endpoint to get the flag information.
diff --git a/tests/custom_cluster/test_admission_controller.py
b/tests/custom_cluster/test_admission_controller.py
index 24c931957..387d93eb4 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -542,6 +542,36 @@ class TestAdmissionController(TestAdmissionControllerBase):
assert re.search("Rejected query from pool default-pool: request memory
needed "
".* is greater than pool max mem resources 10.00 MB",
str(ex))
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
+ pool_max_mem=10 * 1024 * 1024, proc_mem_limit=1024 * 1024 * 1024),
+ statestored_args=_STATESTORED_ARGS)
+ def test_rejected_statements_and_web_ui(self, unique_database):
+ def verify_json_plan(query, error_msg):
+ (_, response_json) = self.run_query_and_get_debug_page(
+ query, "http://localhost:{0}/query_plan", expected_state='ERROR')
+ # Verify that we have a non-empty plan_json.
+ assert 'plan_json' in response_json
+ assert 'plan_nodes' in response_json['plan_json']
+ assert 'label' in response_json['plan_json']['plan_nodes'][0]
+ # Summary is empty as execution could not start.
+ assert 'summary' in response_json
+ assert response_json['summary'] == ''
+ # Verify error message
+ assert error_msg in response_json['status']
+
+ target_tbl = unique_database + ".ctas_target_fail"
+ verify_json_plan("""create table {0} stored as iceberg as select id
+ from functional_parquet.alltypes""".format(target_tbl),
+ "Rejected query from pool")
+ insert_fail = unique_database + ".insert_fail"
+ self.execute_query_expect_success(self.client,
+ """create table {0} as select 100000""".format(insert_fail))
+ verify_json_plan("""insert into {0} select id from
functional_parquet.alltypes"""
+ .format(insert_fail),
+ "Rejected query from pool")
+
@SkipIfFS.hdfs_block_size
@SkipIfEC.parquet_file_size
@pytest.mark.execute_serially
diff --git a/tests/webserver/test_web_pages.py
b/tests/webserver/test_web_pages.py
index 48558c3fa..88ede8994 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -62,6 +62,7 @@ class TestWebPage(ImpalaTestSuite):
BACKENDS_URL = "http://localhost:{0}/backends"
PROMETHEUS_METRICS_URL = "http://localhost:{0}/metrics_prometheus"
QUERIES_URL = "http://localhost:{0}/queries"
+ QUERY_PLAN = "http://localhost:{0}/query_plan"
HEALTHZ_URL = "http://localhost:{0}/healthz"
EVENT_PROCESSOR_URL = "http://localhost:{0}/events"
HADOOP_VARZ_URL = "http://localhost:{0}/hadoop-varz"
@@ -172,7 +173,7 @@ class TestWebPage(ImpalaTestSuite):
query_handle = self.client.execute_async(query)
try:
self.client.wait_for_impala_state(query_handle, RUNNING, 1000)
- memz_breakdown = self.get_debug_page(self.MEMZ_URL)['detailed']
+ memz_breakdown = self._get_debug_page(self.MEMZ_URL)['detailed']
finstance_re = re.compile("Fragment [0-9a-f]{16}:[0-9a-f]{16}")
assert finstance_re.search(memz_breakdown), memz_breakdown
finally:
@@ -319,7 +320,7 @@ class TestWebPage(ImpalaTestSuite):
assert 'Content-Security-Policy' in response.headers, "CSP header
missing"
return responses
- def get_debug_page(self, page_url, port=25000):
+ def _get_debug_page(self, page_url, port=25000):
"""Returns the content of the debug page 'page_url' as json."""
responses = self.get_and_check_status(page_url + "?json",
ports_to_test=[port])
assert len(responses) == 1
@@ -723,7 +724,7 @@ class TestWebPage(ImpalaTestSuite):
# chars + "..."
expected_result = "select \"{0}...".format("x " * 121)
check_if_contains = False
- (_, response_json) = self.__run_query_and_get_debug_page(
+ (_, response_json) = self.run_query_and_get_debug_page(
query, self.QUERIES_URL, expected_state=FINISHED)
# Search the json for the expected value.
# The query can be in in_flight_queries even though it is in FINISHED
state.
@@ -736,27 +737,26 @@ class TestWebPage(ImpalaTestSuite):
assert check_if_contains, "No matching statement found in the jsons at {}:
{}".format(
datetime.now(), json.dumps(response_json, sort_keys=True, indent=4))
- def __run_query_and_get_debug_page(self, query, page_url, query_options=None,
- expected_state=None):
- """Runs a query to obtain the content of the debug page pointed to by
page_url, then
- cancels the query. Optionally takes in an expected_state parameter, if
specified the
- method waits for the query to reach the expected state before getting its
debug
- information."""
- with self.create_impala_client(protocol=HS2) as client:
- if query_options:
- client.set_configuration(query_options)
- query_handle = client.execute_async(query)
- if expected_state:
- client.wait_for_impala_state(query_handle, expected_state, 100)
- query_id = client.handle_id(query_handle)
- responses = self.get_and_check_status(
- "{0}?query_id={1}&json".format(page_url, query_id),
- ports_to_test=self.IMPALAD_TEST_PORT)
- assert len(responses) == 1
- response_json = json.loads(responses[0].text)
- client.cancel(query_handle)
- client.close_query(query_handle)
- return (query_id, response_json)
+ def test_failing_ctas(self, unique_database):
+ """Regression test for IMPALA-14791: Verify that a failing CTAS query does
not
+ crash Impala when the plan is retrieved."""
+ target_tbl = unique_database + ".ctas_target_fail"
+ ctas_query = """create table {0} stored as iceberg as
+ select 100000""".format(target_tbl)
+ debug_action = {'debug_action': 'CATALOGD_ICEBERG_CREATE:EXCEPTION@'
+ 'IcebergAlreadyExistsException@Table was created concurrently'}
+ (_, response_json) = self.run_query_and_get_debug_page(
+ ctas_query, self.QUERY_PLAN, query_options=debug_action,
+ expected_state='ERROR')
+ # Verify that we have a non-empty plan_json.
+ assert 'plan_json' in response_json
+ assert 'plan_nodes' in response_json['plan_json']
+ assert 'label' in response_json['plan_json']['plan_nodes'][0]
+ # Summary is empty as execution could not start.
+ assert 'summary' in response_json
+ assert response_json['summary'] == ''
+ # Verify error message
+ assert "Table already exists" in response_json['status']
@pytest.mark.xfail(run=False, reason="IMPALA-8059")
def test_backend_states(self, unique_database):
@@ -771,7 +771,7 @@ class TestWebPage(ImpalaTestSuite):
'cpu_sys_s', 'done', 'bytes_read']
for query in [sleep_query, ctas_sleep_query]:
- (_, response_json) = self.__run_query_and_get_debug_page(
+ (_, response_json) = self.run_query_and_get_debug_page(
query, self.QUERY_BACKENDS_URL, expected_state=running_state)
assert 'backend_states' in response_json
@@ -784,7 +784,7 @@ class TestWebPage(ImpalaTestSuite):
assert backend_state['status'] == 'OK'
assert not backend_state['done']
- (_, response_json) = self.__run_query_and_get_debug_page(
+ (_, response_json) = self.run_query_and_get_debug_page(
"describe functional.alltypes", self.QUERY_BACKENDS_URL)
assert 'backend_states' not in response_json
@@ -800,7 +800,7 @@ class TestWebPage(ImpalaTestSuite):
'instance_id', 'done']
for query in [sleep_query, ctas_sleep_query]:
- (_, response_json) = self.__run_query_and_get_debug_page(
+ (_, response_json) = self.run_query_and_get_debug_page(
query, self.QUERY_FINSTANCES_URL, query_options=query_options,
expected_state=running_state)
@@ -819,7 +819,7 @@ class TestWebPage(ImpalaTestSuite):
assert instance_stats_property in instance_stats
assert not instance_stats['done']
- (_, response_json) = self.__run_query_and_get_debug_page(
+ (_, response_json) = self.run_query_and_get_debug_page(
"describe functional.alltypes", self.QUERY_BACKENDS_URL,
query_options=query_options)
assert 'backend_instances' not in response_json
@@ -852,7 +852,7 @@ class TestWebPage(ImpalaTestSuite):
def test_rpc_read_write_metrics(self):
"""Test that read/write metrics are exposed in /rpcz"""
- rpcz = self.get_debug_page(self.RPCZ_URL)
+ rpcz = self._get_debug_page(self.RPCZ_URL)
hist_time_regex = "[0-9][0-9numsh.]*"
rpc_histogram_regex = (
"Count: [0-9]+, sum: " + hist_time_regex
@@ -878,12 +878,12 @@ class TestWebPage(ImpalaTestSuite):
SVC_NAME = 'impala.DataStreamService'
def is_krpc_use_unix_domain_socket():
- rpcz = self.get_debug_page(self.RPCZ_URL)
+ rpcz = self._get_debug_page(self.RPCZ_URL)
return rpcz['rpc_use_unix_domain_socket']
def get_per_conn_metrics(inbound):
"""Get inbound or outbound per-connection metrics"""
- rpcz = self.get_debug_page(self.RPCZ_URL)
+ rpcz = self._get_debug_page(self.RPCZ_URL)
if inbound:
key = "inbound_per_conn_metrics"
else:
@@ -892,7 +892,7 @@ class TestWebPage(ImpalaTestSuite):
return conns
def get_svc_metrics(svc_name):
- rpcz = self.get_debug_page(self.RPCZ_URL)
+ rpcz = self._get_debug_page(self.RPCZ_URL)
assert len(rpcz['services']) > 0
for s in rpcz['services']:
if s['service_name'] == svc_name:
@@ -1246,7 +1246,7 @@ class TestWebPage(ImpalaTestSuite):
def test_query_progress(self):
"""Tests that /queries page shows query progress."""
query = "select count(*) from functional_parquet.alltypes where bool_col =
sleep(100)"
- (query_id, response_json) = self.__run_query_and_get_debug_page(
+ (query_id, response_json) = self.run_query_and_get_debug_page(
query, self.QUERIES_URL, expected_state=RUNNING)
found = False
for json_part in response_json['in_flight_queries']: