This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cec0bfe1 IMPALA-14791: Fix crash in PlanToJson when sink was not 
executed
6cec0bfe1 is described below

commit 6cec0bfe179162a0943108cc624ac74ce4b5fbaa
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Fri Feb 27 15:29:24 2026 +0100

    IMPALA-14791: Fix crash in PlanToJson when sink was not executed
    
    impala-http-handler.cc::SinkToJsonHelper had the assumption that we
    always have a TPlanNodeExecSummary for the sink, but it is not true.
    In some cases we don't have plan node exec summaries, e.g.:
    * CTAS query failed to create the target table, so execution did not
      start
    * Admission control rejected query execution
    
    This patch set fixes SinkToJsonHelper to work well in case of missing
    summaries.
    
    Testing:
     * e2e added with failing CTAS
     * custom tests added with failing CTAS and INSERT due to admission
       control
    
    Change-Id: I4fe6ed48d365a34380991f544e1ff628e95fa89e
    Reviewed-on: http://gerrit.cloudera.org:8080/24050
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/impala-http-handler.cc             |  8 ++-
 tests/common/impala_test_suite.py                 | 20 +++++++
 tests/custom_cluster/test_admission_controller.py | 30 +++++++++++
 tests/webserver/test_web_pages.py                 | 66 +++++++++++------------
 4 files changed, 89 insertions(+), 35 deletions(-)

diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index 849a9c716..0f86585fc 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -1185,8 +1185,12 @@ void SinkToJsonHelper(const TDataSink& sink,
 
   map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary_it =
       summaries.find(SINK_ID);
-  DCHECK(summary_it != summaries.end());
-  ExecStatsToJsonHelper(summary_it->second, document, value);
+  if (summary_it != summaries.end()) {
+    // Sometimes we don't have a summary for the sink, e.g.:
+    // - Query was rejected by admission control
+    // - CTAS query failed to create the target table, so the sink was never 
executed
+    ExecStatsToJsonHelper(summary_it->second, document, value);
+  }
 
   Value children(kArrayType);
 
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index ab710a387..c7b1d4808 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -615,6 +615,26 @@ class ImpalaTestSuite(BaseTestSuite):
     assert response.status_code == requests.codes.ok
     return json.loads(response.text)
 
+  def run_query_and_get_debug_page(self, query, page_url, query_options=None,
+                                     expected_state=None, timeout=100):
+    """Runs a query to obtain the content of the debug page pointed to by 
page_url, then
+    cancels the query. Optionally takes in an expected_state parameter, if 
specified the
+    method waits for the query to reach the expected state before getting its 
debug
+    information."""
+    with self.create_impala_client(protocol=HS2) as client:
+      if query_options:
+        client.set_configuration(query_options)
+      query_handle = client.execute_async(query)
+      if expected_state:
+        client.wait_for_impala_state(query_handle, expected_state, timeout)
+      query_id = client.handle_id(query_handle)
+      response_json = self.get_debug_page(
+          "{0}?query_id={1}&json".format(page_url, query_id).
+          format(25000))
+      client.cancel(query_handle)
+      client.close_query(query_handle)
+      return (query_id, response_json)
+
   def get_var_current_val(self, var):
     """Returns the current value of a given Impalad flag variable."""
     # Parse the /varz endpoint to get the flag information.
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 24c931957..387d93eb4 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -542,6 +542,36 @@ class TestAdmissionController(TestAdmissionControllerBase):
       assert re.search("Rejected query from pool default-pool: request memory 
needed "
                        ".* is greater than pool max mem resources 10.00 MB", 
str(ex))
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
+          pool_max_mem=10 * 1024 * 1024, proc_mem_limit=1024 * 1024 * 1024),
+      statestored_args=_STATESTORED_ARGS)
+  def test_rejected_statements_and_web_ui(self, unique_database):
+    def verify_json_plan(query, error_msg):
+      (_, response_json) = self.run_query_and_get_debug_page(
+          query, "http://localhost:{0}/query_plan";, expected_state='ERROR')
+      # Verify that we have a non-empty plan_json.
+      assert 'plan_json' in response_json
+      assert 'plan_nodes' in response_json['plan_json']
+      assert 'label' in response_json['plan_json']['plan_nodes'][0]
+      # Summary is empty as execution could not start.
+      assert 'summary' in response_json
+      assert response_json['summary'] == ''
+      # Verify error message
+      assert error_msg in response_json['status']
+
+    target_tbl = unique_database + ".ctas_target_fail"
+    verify_json_plan("""create table {0} stored as iceberg as select id
+                      from functional_parquet.alltypes""".format(target_tbl),
+                      "Rejected query from pool")
+    insert_fail = unique_database + ".insert_fail"
+    self.execute_query_expect_success(self.client,
+        """create table {0} as select 100000""".format(insert_fail))
+    verify_json_plan("""insert into {0} select id from 
functional_parquet.alltypes"""
+        .format(insert_fail),
+        "Rejected query from pool")
+
   @SkipIfFS.hdfs_block_size
   @SkipIfEC.parquet_file_size
   @pytest.mark.execute_serially
diff --git a/tests/webserver/test_web_pages.py 
b/tests/webserver/test_web_pages.py
index 48558c3fa..88ede8994 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -62,6 +62,7 @@ class TestWebPage(ImpalaTestSuite):
   BACKENDS_URL = "http://localhost:{0}/backends";
   PROMETHEUS_METRICS_URL = "http://localhost:{0}/metrics_prometheus";
   QUERIES_URL = "http://localhost:{0}/queries";
+  QUERY_PLAN = "http://localhost:{0}/query_plan";
   HEALTHZ_URL = "http://localhost:{0}/healthz";
   EVENT_PROCESSOR_URL = "http://localhost:{0}/events";
   HADOOP_VARZ_URL = "http://localhost:{0}/hadoop-varz";
@@ -172,7 +173,7 @@ class TestWebPage(ImpalaTestSuite):
     query_handle = self.client.execute_async(query)
     try:
       self.client.wait_for_impala_state(query_handle, RUNNING, 1000)
-      memz_breakdown = self.get_debug_page(self.MEMZ_URL)['detailed']
+      memz_breakdown = self._get_debug_page(self.MEMZ_URL)['detailed']
       finstance_re = re.compile("Fragment [0-9a-f]{16}:[0-9a-f]{16}")
       assert finstance_re.search(memz_breakdown), memz_breakdown
     finally:
@@ -319,7 +320,7 @@ class TestWebPage(ImpalaTestSuite):
       assert 'Content-Security-Policy' in response.headers, "CSP header 
missing"
     return responses
 
-  def get_debug_page(self, page_url, port=25000):
+  def _get_debug_page(self, page_url, port=25000):
     """Returns the content of the debug page 'page_url' as json."""
     responses = self.get_and_check_status(page_url + "?json", 
ports_to_test=[port])
     assert len(responses) == 1
@@ -723,7 +724,7 @@ class TestWebPage(ImpalaTestSuite):
     # chars + "..."
     expected_result = "select \"{0}...".format("x " * 121)
     check_if_contains = False
-    (_, response_json) = self.__run_query_and_get_debug_page(
+    (_, response_json) = self.run_query_and_get_debug_page(
       query, self.QUERIES_URL, expected_state=FINISHED)
     # Search the json for the expected value.
     # The query can be in in_flight_queries even though it is in FINISHED 
state.
@@ -736,27 +737,26 @@ class TestWebPage(ImpalaTestSuite):
     assert check_if_contains, "No matching statement found in the jsons at {}: 
{}".format(
         datetime.now(), json.dumps(response_json, sort_keys=True, indent=4))
 
-  def __run_query_and_get_debug_page(self, query, page_url, query_options=None,
-                                     expected_state=None):
-    """Runs a query to obtain the content of the debug page pointed to by 
page_url, then
-    cancels the query. Optionally takes in an expected_state parameter, if 
specified the
-    method waits for the query to reach the expected state before getting its 
debug
-    information."""
-    with self.create_impala_client(protocol=HS2) as client:
-      if query_options:
-        client.set_configuration(query_options)
-      query_handle = client.execute_async(query)
-      if expected_state:
-        client.wait_for_impala_state(query_handle, expected_state, 100)
-      query_id = client.handle_id(query_handle)
-      responses = self.get_and_check_status(
-        "{0}?query_id={1}&json".format(page_url, query_id),
-        ports_to_test=self.IMPALAD_TEST_PORT)
-      assert len(responses) == 1
-      response_json = json.loads(responses[0].text)
-      client.cancel(query_handle)
-      client.close_query(query_handle)
-      return (query_id, response_json)
+  def test_failing_ctas(self, unique_database):
+    """Regression test for IMPALA-14791: Verify that a failing CTAS query does 
not
+    crash Impala when the plan is retrieved."""
+    target_tbl = unique_database + ".ctas_target_fail"
+    ctas_query = """create table {0} stored as iceberg as
+                  select 100000""".format(target_tbl)
+    debug_action = {'debug_action': 'CATALOGD_ICEBERG_CREATE:EXCEPTION@'
+        'IcebergAlreadyExistsException@Table was created concurrently'}
+    (_, response_json) = self.run_query_and_get_debug_page(
+        ctas_query, self.QUERY_PLAN, query_options=debug_action,
+        expected_state='ERROR')
+    # Verify that we have a non-empty plan_json.
+    assert 'plan_json' in response_json
+    assert 'plan_nodes' in response_json['plan_json']
+    assert 'label' in response_json['plan_json']['plan_nodes'][0]
+    # Summary is empty as execution could not start.
+    assert 'summary' in response_json
+    assert response_json['summary'] == ''
+    # Verify error message
+    assert "Table already exists" in response_json['status']
 
   @pytest.mark.xfail(run=False, reason="IMPALA-8059")
   def test_backend_states(self, unique_database):
@@ -771,7 +771,7 @@ class TestWebPage(ImpalaTestSuite):
                                 'cpu_sys_s', 'done', 'bytes_read']
 
     for query in [sleep_query, ctas_sleep_query]:
-      (_, response_json) = self.__run_query_and_get_debug_page(
+      (_, response_json) = self.run_query_and_get_debug_page(
           query, self.QUERY_BACKENDS_URL, expected_state=running_state)
 
       assert 'backend_states' in response_json
@@ -784,7 +784,7 @@ class TestWebPage(ImpalaTestSuite):
         assert backend_state['status'] == 'OK'
         assert not backend_state['done']
 
-    (_, response_json) = self.__run_query_and_get_debug_page(
+    (_, response_json) = self.run_query_and_get_debug_page(
         "describe functional.alltypes", self.QUERY_BACKENDS_URL)
     assert 'backend_states' not in response_json
 
@@ -800,7 +800,7 @@ class TestWebPage(ImpalaTestSuite):
                                  'instance_id', 'done']
 
     for query in [sleep_query, ctas_sleep_query]:
-      (_, response_json) = self.__run_query_and_get_debug_page(
+      (_, response_json) = self.run_query_and_get_debug_page(
           query, self.QUERY_FINSTANCES_URL, query_options=query_options,
           expected_state=running_state)
 
@@ -819,7 +819,7 @@ class TestWebPage(ImpalaTestSuite):
             assert instance_stats_property in instance_stats
         assert not instance_stats['done']
 
-    (_, response_json) = self.__run_query_and_get_debug_page(
+    (_, response_json) = self.run_query_and_get_debug_page(
         "describe functional.alltypes", self.QUERY_BACKENDS_URL,
         query_options=query_options)
     assert 'backend_instances' not in response_json
@@ -852,7 +852,7 @@ class TestWebPage(ImpalaTestSuite):
 
   def test_rpc_read_write_metrics(self):
     """Test that read/write metrics are exposed in /rpcz"""
-    rpcz = self.get_debug_page(self.RPCZ_URL)
+    rpcz = self._get_debug_page(self.RPCZ_URL)
     hist_time_regex = "[0-9][0-9numsh.]*"
     rpc_histogram_regex = (
       "Count: [0-9]+, sum: " + hist_time_regex
@@ -878,12 +878,12 @@ class TestWebPage(ImpalaTestSuite):
     SVC_NAME = 'impala.DataStreamService'
 
     def is_krpc_use_unix_domain_socket():
-      rpcz = self.get_debug_page(self.RPCZ_URL)
+      rpcz = self._get_debug_page(self.RPCZ_URL)
       return rpcz['rpc_use_unix_domain_socket']
 
     def get_per_conn_metrics(inbound):
       """Get inbound or outbound per-connection metrics"""
-      rpcz = self.get_debug_page(self.RPCZ_URL)
+      rpcz = self._get_debug_page(self.RPCZ_URL)
       if inbound:
         key = "inbound_per_conn_metrics"
       else:
@@ -892,7 +892,7 @@ class TestWebPage(ImpalaTestSuite):
       return conns
 
     def get_svc_metrics(svc_name):
-      rpcz = self.get_debug_page(self.RPCZ_URL)
+      rpcz = self._get_debug_page(self.RPCZ_URL)
       assert len(rpcz['services']) > 0
       for s in rpcz['services']:
         if s['service_name'] == svc_name:
@@ -1246,7 +1246,7 @@ class TestWebPage(ImpalaTestSuite):
   def test_query_progress(self):
     """Tests that /queries page shows query progress."""
     query = "select count(*) from functional_parquet.alltypes where bool_col = 
sleep(100)"
-    (query_id, response_json) = self.__run_query_and_get_debug_page(
+    (query_id, response_json) = self.run_query_and_get_debug_page(
         query, self.QUERIES_URL, expected_state=RUNNING)
     found = False
     for json_part in response_json['in_flight_queries']:

Reply via email to