This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bf63a295cc Fix Dataflow cost benchmark after Dataflow client 
migration (#38788)
0bf63a295cc is described below

commit 0bf63a295cc8a8e1ac2b90383a91c110b2d377f8
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Tue Jun 9 00:37:31 2026 +0200

    Fix Dataflow cost benchmark after Dataflow client migration (#38788)
    
    * Fix Dataflow cost benchmark after Dataflow client migration
    
    * Fix formatter
---
 .../testing/load_tests/dataflow_cost_benchmark.py    | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git 
a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py 
b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py
index 3300831b8ea..4d412eb228e 100644
--- a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py
+++ b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py
@@ -141,7 +141,7 @@ class DataflowCostBenchmark(LoadTest):
     return system_metrics
 
   def _get_worker_time_interval(
-      self, job_id: str) -> tuple[Optional[str], Optional[str]]:
+      self, job_id: str) -> tuple[Optional[datetime], Optional[datetime]]:
     """Extracts worker start and stop times from job messages."""
     start_time, end_time = None, None
     page_token = None
@@ -155,7 +155,7 @@ class DataflowCostBenchmark(LoadTest):
         page_token=page_token,
         minimum_importance='JOB_MESSAGE_DEBUG')
       for message in messages:
-        text = message.messageText
+        text = message.message_text
         if getattr(message, 'time', None):
           last_message_time = message.time
         if text:
@@ -186,8 +186,8 @@ class DataflowCostBenchmark(LoadTest):
       self,
       project: str,
       job_id: str,
-      start_time: str,
-      end_time: str,
+      start_time: datetime,
+      end_time: datetime,
       pcollection_name: Optional[str] = None,
   ) -> dict[str, float]:
     """Query Cloud Monitoring for per-PCollection throughput."""
@@ -256,7 +256,8 @@ class DataflowCostBenchmark(LoadTest):
     return metrics
 
   def _get_streaming_throughput_metrics(
-      self, project: str, start_time: str, end_time: str) -> dict[str, float]:
+      self, project: str, start_time: datetime,
+      end_time: datetime) -> dict[str, float]:
     if not self.subscription:
       return {'AvgThroughputBytes': 0.0, 'AvgThroughputElements': 0.0}
 
@@ -297,17 +298,14 @@ class DataflowCostBenchmark(LoadTest):
       metrics[f"AvgThroughput{key}"] = avg_rate
     return metrics
 
-  def _get_job_runtime(self, start_time: str, end_time: str) -> float:
+  def _get_job_runtime(self, start_time: datetime, end_time: datetime) -> 
float:
     """Calculates the job runtime duration in seconds."""
-    start_dt = datetime.fromisoformat(start_time[:-1])
-    end_dt = datetime.fromisoformat(end_time[:-1])
-    return (end_dt - start_dt).total_seconds()
+    return (end_time - start_time).total_seconds()
 
   def _get_additional_metrics(self,
                               result: DataflowPipelineResult) -> dict[str, 
Any]:
     job_id = result.job_id()
-    job = self.dataflow_client.get_job(job_id)
-    project = job.projectId
+    project = self.project_id
     start_time, end_time = self._get_worker_time_interval(job_id)
     if not start_time or not end_time:
       logging.warning('Could not find valid worker start/end times.')

Reply via email to