This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c0c6cc9df4131f87737ff975e82d3e4832a2181b
Author: Riza Suminto <[email protected]>
AuthorDate: Thu May 1 12:12:18 2025 -0700

    IMPALA-12201: Stabilize TestFetch
    
    This patch attempt to stabilize TestFetch by using HS2 as test protocol.
    test_rows_sent_counters is modified to use the default hs2_client.
    test_client_fetch_time_stats and test_client_fetch_time_stats_incomplete
    is modified to use MinimalHS2Connection that has more simpler mechanism
    in terms of fetching (ImpylaHS2Connection always fetch 10240 rows at a
    time).
    
    Implemented minimal functions needed to wait for finished state and pull
    runtime profile at MinimalHS2Connection.
    
    Testing:
    Loop the test 50 times and pass them all.
    
    Change-Id: I52651df37a318357711d26d2414e025cce4185c3
    Reviewed-on: http://gerrit.cloudera.org:8080/22847
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/common/impala_connection.py | 21 +++++++++++--
 tests/query_test/test_fetch.py    | 65 ++++++++++++++++++++++-----------------
 2 files changed, 55 insertions(+), 31 deletions(-)

diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index caca64e35..6d6f6d338 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -1142,12 +1142,27 @@ class MinimalHS2Connection(ImpalaConnection):
   def get_impala_exec_state(self, operation_handle):  # noqa: U100
     raise NotImplementedError()
 
-  def get_runtime_profile(self, operation_handle,  # noqa: U100
-                          profile_format=TRuntimeProfileFormat.STRING):  # 
noqa: U100
-    raise NotImplementedError()
+  def get_runtime_profile(self, operation_handle,
+                          profile_format=TRuntimeProfileFormat.STRING):
+    return self.__get_operation(operation_handle).get_profile(profile_format)
 
   def wait_for_admission_control(self, operation_handle, timeout_s=60):  # 
noqa: U100
     raise NotImplementedError()
 
   def get_exec_summary(self, operation_handle):  # noqa: U100
     raise NotImplementedError()
+
+  def wait_for_finished_timeout(self, operation_handle, timeout):
+    start_time = time.time()
+    while time.time() - start_time < timeout:
+      start_rpc_time = time.time()
+      hs2_state = self.get_state(operation_handle)
+      rpc_time = time.time() - start_rpc_time
+      # if the rpc succeeded, the output is the query state
+      if hs2_state == "FINISHED_STATE":
+        return True
+      elif hs2_state == "ERROR_STATE":
+        break
+      if rpc_time < DEFAULT_SLEEP_INTERVAL:
+        time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time)
+    return False
diff --git a/tests/query_test/test_fetch.py b/tests/query_test/test_fetch.py
index 390f0aa4f..512e10fd5 100644
--- a/tests/query_test/test_fetch.py
+++ b/tests/query_test/test_fetch.py
@@ -19,9 +19,12 @@ from __future__ import absolute_import, division, 
print_function
 import re
 
 from time import sleep
-from tests.common.impala_connection import FINISHED
-from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.test_dimensions import extend_exec_option_dimension
+from tests.common.impala_connection import FINISHED, MinimalHS2Connection
+from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, 
ImpalaTestSuite
+from tests.common.test_dimensions import (
+    create_uncompressed_text_dimension,
+    extend_exec_option_dimension,
+)
 from tests.util.parse_util import parse_duration_string_ms, \
     parse_duration_string_ns, get_time_summary_stats_counter
 
@@ -33,26 +36,27 @@ class TestFetch(ImpalaTestSuite):
   def add_test_dimensions(cls):
     super(TestFetch, cls).add_test_dimensions()
     # Result fetching should be independent of file format, so only test 
against
-    # Parquet files.
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format == 'parquet')
+    # text files.
+    cls.ImpalaTestMatrix.add_dimension(
+      create_uncompressed_text_dimension(cls.get_workload()))
 
-  def test_rows_sent_counters(self, vector):
+  def test_rows_sent_counters(self):
     """Validate that ClientFetchWaitTimer, NumRowsFetched, 
RowMaterializationRate,
     and RowMaterializationTimer are set to valid values in the ImpalaServer 
section
     of the runtime profile."""
     num_rows = 25
     query = "select sleep(100) from functional.alltypes limit 
{0}".format(num_rows)
-    handle = self.execute_query_async(query, vector.get_value('exec_option'))
+    client = self.hs2_client
+    handle = client.execute_async(query)
     try:
       # Wait until the query is 'FINISHED' and results are available for 
fetching.
-      self.client.wait_for_impala_state(handle, FINISHED, 30)
+      client.wait_for_impala_state(handle, FINISHED, 30)
       # Sleep for 2.5 seconds so that the ClientFetchWaitTimer is >= 1s.
       sleep(2.5)
       # Fetch the results so that the fetch related counters are updated.
-      assert self.client.fetch(query, handle).success
+      assert client.fetch(query, handle).success
 
-      runtime_profile = self.client.get_runtime_profile(handle)
+      runtime_profile = client.get_runtime_profile(handle)
       fetch_timer = re.search("ClientFetchWaitTimer: (.*)", runtime_profile)
       assert fetch_timer and len(fetch_timer.groups()) == 1 and \
           parse_duration_string_ms(fetch_timer.group(1)) > 1000
@@ -82,25 +86,26 @@ class TestFetch(ImpalaTestSuite):
       assert 2400 < create_result_ms and create_result_ms < 2600
 
     finally:
-      self.client.close_query(handle)
+      client.close_query(handle)
 
-  def test_client_fetch_time_stats(self, vector):
+  def test_client_fetch_time_stats(self):
     num_rows = 27
+    client = MinimalHS2Connection(IMPALAD_HS2_HOST_PORT)
     query = "select sleep(10) from functional.alltypes limit 
{0}".format(num_rows)
-    handle = self.execute_query_async(query, vector.get_value('exec_option'))
+    handle = client.execute_async(query)
     try:
       # Wait until the query is 'FINISHED' and results are available for 
fetching.
-      self.client.wait_for_impala_state(handle, FINISHED, 30)
+      client.wait_for_finished_timeout(handle, 30)
 
       # This loop will do 6 fetches that contain data and a final fetch with
       # no data. The last fetch is after eos has been set, so it does not 
count.
       rows_fetched = 0
       while True:
-        result = self.client.fetch(query, handle, max_rows=5)
-        assert result.success
-        rows_fetched += len(result.data)
+        result = client.fetch(query, handle, max_rows=5)
+        assert result is not None
+        rows_fetched += len(result)
         # If no rows are returned, we are done.
-        if len(result.data) == 0:
+        if len(result) == 0:
           break
         sleep(0.1)
 
@@ -108,9 +113,10 @@ class TestFetch(ImpalaTestSuite):
       # count as client wait time, because the query is already done.
       sleep(2.5)
     finally:
-      self.client.close_query(handle)
+      client.close_query(handle)
+
+    runtime_profile = client.get_runtime_profile(handle)
 
-    runtime_profile = self.client.get_runtime_profile(handle)
     summary_stats = get_time_summary_stats_counter("ClientFetchWaitTimeStats",
                                                    runtime_profile)
     assert len(summary_stats) == 1
@@ -118,28 +124,30 @@ class TestFetch(ImpalaTestSuite):
     # The 2.5 second sleep should not count, so the max must be less than 2.5 
seconds.
     assert summary_stats[0].max_value < 2500000000
     assert summary_stats[0].min_value > 0
+    client.close()
 
-  def test_client_fetch_time_stats_incomplete(self, vector):
+  def test_client_fetch_time_stats_incomplete(self):
     num_rows = 27
+    client = MinimalHS2Connection(IMPALAD_HS2_HOST_PORT)
     query = "select sleep(10) from functional.alltypes limit 
{0}".format(num_rows)
-    handle = self.execute_query_async(query, vector.get_value('exec_option'))
+    handle = client.execute_async(query)
     try:
       # Wait until the query is 'FINISHED' and results are available for 
fetching.
-      self.client.wait_for_impala_state(handle, FINISHED, 30)
+      client.wait_for_finished_timeout(handle, 30)
 
       # This loop will do 5 fetches for a total of 25 rows. This is incomplete.
       for i in range(5):
-        result = self.client.fetch(query, handle, max_rows=5)
-        assert result.success
+        result = client.fetch(query, handle, max_rows=5)
+        assert result is not None
         sleep(0.1)
 
       # Sleep before closing the query. For an incomplete fetch, this still 
counts
       # towards the query time, so this does show up in the counters.
       sleep(2.5)
     finally:
-      self.client.close_query(handle)
+      client.close_query(handle)
 
-    runtime_profile = self.client.get_runtime_profile(handle)
+    runtime_profile = client.get_runtime_profile(handle)
 
     summary_stats = get_time_summary_stats_counter("ClientFetchWaitTimeStats",
                                                    runtime_profile)
@@ -149,6 +157,7 @@ class TestFetch(ImpalaTestSuite):
     # The 2.5 second sleep does count for an incomplete fetch, verify the max 
is higher.
     assert summary_stats[0].max_value >= 2500000000
     assert summary_stats[0].min_value > 0
+    client.close()
 
 
 class TestFetchAndSpooling(ImpalaTestSuite):

Reply via email to