This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c5e77ce251e fix(providers/amazon): handle multi-node job log streams
in BatchClientHook (#63476)
c5e77ce251e is described below
commit c5e77ce251e439c6a9e3746f719c34710685854c
Author: Antonio Mello <[email protected]>
AuthorDate: Sat Mar 14 11:01:01 2026 -0300
fix(providers/amazon): handle multi-node job log streams in BatchClientHook
(#63476)
* fix(providers/amazon): handle multi-node job log streams in
BatchClientHook
Multi-node AWS Batch jobs return log stream names under
`attempts[].taskProperties[].containers[].logStreamName` instead of
the single-node path `attempts[].container.logStreamName`. The existing
code only checked the single-node path, producing `[None]` for
multi-node jobs which bypassed the emptiness guard and caused a
TypeError in urllib.parse.quote_plus() downstream.
Now extracts stream names from both schemas and filters out None values.
Closes #54254
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* fix: add continue after single-node stream match
Skip multi-node iteration when single-node stream is already found,
as suggested in review.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../providers/amazon/aws/hooks/batch_client.py | 16 ++++-
.../unit/amazon/aws/hooks/test_batch_client.py | 79 ++++++++++++++++++++++
2 files changed, 93 insertions(+), 2 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/hooks/batch_client.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/batch_client.py
index 341930c9dab..7530af6e290 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/batch_client.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/batch_client.py
@@ -476,8 +476,20 @@ class BatchClientHook(AwsBaseHook):
p.get("container", {}).get("logConfiguration", {})
for p in job_node_properties.get("nodeRangeProperties", {})
]
- # one stream name per attempt
- stream_names = [a.get("container", {}).get("logStreamName") for a
in job_desc.get("attempts", [])]
+ # one stream name per attempt — handles both single-node and
multi-node schemas
+ stream_names = []
+ for attempt in job_desc.get("attempts", []):
+ # Single-node: attempts[].container.logStreamName
+ container_stream = attempt.get("container",
{}).get("logStreamName")
+ if container_stream:
+ stream_names.append(container_stream)
+ continue
+ # Multi-node:
attempts[].taskProperties[].containers[].logStreamName
+ for task_prop in attempt.get("taskProperties", []):
+ for container in task_prop.get("containers", []):
+ task_stream = container.get("logStreamName")
+ if task_stream:
+ stream_names.append(task_stream)
elif job_container_desc:
log_configs = [job_container_desc.get("logConfiguration", {})]
stream_name = job_container_desc.get("logStreamName")
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_batch_client.py
b/providers/amazon/tests/unit/amazon/aws/hooks/test_batch_client.py
index 34b721f54be..80659f07a72 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_batch_client.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_batch_client.py
@@ -400,6 +400,85 @@ class TestBatchClient:
# all combinations listed above should have been seen
assert all(combinations.values())
+ def test_job_awslogs_multinode_job_task_properties(self):
+ """Test multi-node jobs using taskProperties schema (issue #54254)."""
+ self.client_mock.describe_jobs.return_value = {
+ "jobs": [
+ {
+ "jobId": JOB_ID,
+ "attempts": [
+ {
+ "taskProperties": [
+ {
+ "containers": [
+ {"exitCode": 0, "logStreamName":
"test/stream/node0"},
+ {"exitCode": 0, "logStreamName":
"test/stream/node1"},
+ ]
+ }
+ ]
+ },
+ ],
+ "nodeProperties": {
+ "mainNode": 0,
+ "nodeRangeProperties": [
+ {
+ "targetNodes": "0:",
+ "container": {
+ "logConfiguration": {
+ "logDriver": "awslogs",
+ "options": {
+ "awslogs-group":
"/test/batch/job-a",
+ "awslogs-region": AWS_REGION,
+ },
+ }
+ },
+ },
+ ],
+ },
+ }
+ ]
+ }
+ awslogs = self.batch_client.get_job_all_awslogs_info(JOB_ID)
+ assert len(awslogs) == 2
+ assert all(log["awslogs_region"] == AWS_REGION for log in awslogs)
+ stream_names = [log["awslogs_stream_name"] for log in awslogs]
+ assert "test/stream/node0" in stream_names
+ assert "test/stream/node1" in stream_names
+
+ def test_job_awslogs_multinode_no_container_log_stream(self, caplog):
+ """Test multi-node job where attempts[].container has no logStreamName
(issue #54254)."""
+ self.client_mock.describe_jobs.return_value = {
+ "jobs": [
+ {
+ "jobId": JOB_ID,
+ "attempts": [
+ {"container": {"exitCode": 0}},
+ ],
+ "nodeProperties": {
+ "mainNode": 0,
+ "nodeRangeProperties": [
+ {
+ "targetNodes": "0:",
+ "container": {
+ "logConfiguration": {
+ "logDriver": "awslogs",
+ "options": {
+ "awslogs-group":
"/test/batch/job-a",
+ "awslogs-region": AWS_REGION,
+ },
+ }
+ },
+ },
+ ],
+ },
+ }
+ ]
+ }
+ with caplog.at_level(level=logging.WARNING):
+ awslogs = self.batch_client.get_job_all_awslogs_info(JOB_ID)
+ assert awslogs == []
+ assert "doesn't have any AWS CloudWatch Stream" in
caplog.messages[0]
+
class TestBatchClientDelays:
@mock.patch.dict("os.environ", AWS_DEFAULT_REGION=AWS_REGION)