This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0bf63a295cc Fix Dataflow cost benchmark after Dataflow client
migration (#38788)
0bf63a295cc is described below
commit 0bf63a295cc8a8e1ac2b90383a91c110b2d377f8
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Tue Jun 9 00:37:31 2026 +0200
Fix Dataflow cost benchmark after Dataflow client migration (#38788)
* Fix Dataflow cost benchmark after Dataflow client migration
* Fix formatter
---
.../testing/load_tests/dataflow_cost_benchmark.py | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
diff --git
a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py
b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py
index 3300831b8ea..4d412eb228e 100644
--- a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py
+++ b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py
@@ -141,7 +141,7 @@ class DataflowCostBenchmark(LoadTest):
return system_metrics
def _get_worker_time_interval(
- self, job_id: str) -> tuple[Optional[str], Optional[str]]:
+ self, job_id: str) -> tuple[Optional[datetime], Optional[datetime]]:
"""Extracts worker start and stop times from job messages."""
start_time, end_time = None, None
page_token = None
@@ -155,7 +155,7 @@ class DataflowCostBenchmark(LoadTest):
page_token=page_token,
minimum_importance='JOB_MESSAGE_DEBUG')
for message in messages:
- text = message.messageText
+ text = message.message_text
if getattr(message, 'time', None):
last_message_time = message.time
if text:
@@ -186,8 +186,8 @@ class DataflowCostBenchmark(LoadTest):
self,
project: str,
job_id: str,
- start_time: str,
- end_time: str,
+ start_time: datetime,
+ end_time: datetime,
pcollection_name: Optional[str] = None,
) -> dict[str, float]:
"""Query Cloud Monitoring for per-PCollection throughput."""
@@ -256,7 +256,8 @@ class DataflowCostBenchmark(LoadTest):
return metrics
def _get_streaming_throughput_metrics(
- self, project: str, start_time: str, end_time: str) -> dict[str, float]:
+ self, project: str, start_time: datetime,
+ end_time: datetime) -> dict[str, float]:
if not self.subscription:
return {'AvgThroughputBytes': 0.0, 'AvgThroughputElements': 0.0}
@@ -297,17 +298,14 @@ class DataflowCostBenchmark(LoadTest):
metrics[f"AvgThroughput{key}"] = avg_rate
return metrics
- def _get_job_runtime(self, start_time: str, end_time: str) -> float:
+ def _get_job_runtime(self, start_time: datetime, end_time: datetime) ->
float:
"""Calculates the job runtime duration in seconds."""
- start_dt = datetime.fromisoformat(start_time[:-1])
- end_dt = datetime.fromisoformat(end_time[:-1])
- return (end_dt - start_dt).total_seconds()
+ return (end_time - start_time).total_seconds()
def _get_additional_metrics(self,
result: DataflowPipelineResult) -> dict[str,
Any]:
job_id = result.job_id()
- job = self.dataflow_client.get_job(job_id)
- project = job.projectId
+ project = self.project_id
start_time, end_time = self._get_worker_time_interval(job_id)
if not start_time or not end_time:
logging.warning('Could not find valid worker start/end times.')