This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c5e77ce251e fix(providers/amazon): handle multi-node job log streams 
in BatchClientHook (#63476)
c5e77ce251e is described below

commit c5e77ce251e439c6a9e3746f719c34710685854c
Author: Antonio Mello <[email protected]>
AuthorDate: Sat Mar 14 11:01:01 2026 -0300

    fix(providers/amazon): handle multi-node job log streams in BatchClientHook 
(#63476)
    
    * fix(providers/amazon): handle multi-node job log streams in 
BatchClientHook
    
    Multi-node AWS Batch jobs return log stream names under
    `attempts[].taskProperties[].containers[].logStreamName` instead of
    the single-node path `attempts[].container.logStreamName`. The existing
    code only checked the single-node path, producing `[None]` for
    multi-node jobs which bypassed the emptiness guard and caused a
    TypeError in urllib.parse.quote_plus() downstream.
    
    Now extracts stream names from both schemas and filters out None values.
    
    Closes #54254
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * fix: add continue after single-node stream match
    
    Skip multi-node iteration when single-node stream is already found,
    as suggested in review.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../providers/amazon/aws/hooks/batch_client.py     | 16 ++++-
 .../unit/amazon/aws/hooks/test_batch_client.py     | 79 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 2 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/hooks/batch_client.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/batch_client.py
index 341930c9dab..7530af6e290 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/batch_client.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/batch_client.py
@@ -476,8 +476,20 @@ class BatchClientHook(AwsBaseHook):
                 p.get("container", {}).get("logConfiguration", {})
                 for p in job_node_properties.get("nodeRangeProperties", {})
             ]
-            # one stream name per attempt
-            stream_names = [a.get("container", {}).get("logStreamName") for a 
in job_desc.get("attempts", [])]
+            # one stream name per attempt — handles both single-node and 
multi-node schemas
+            stream_names = []
+            for attempt in job_desc.get("attempts", []):
+                # Single-node: attempts[].container.logStreamName
+                container_stream = attempt.get("container", 
{}).get("logStreamName")
+                if container_stream:
+                    stream_names.append(container_stream)
+                    continue
+                # Multi-node: 
attempts[].taskProperties[].containers[].logStreamName
+                for task_prop in attempt.get("taskProperties", []):
+                    for container in task_prop.get("containers", []):
+                        task_stream = container.get("logStreamName")
+                        if task_stream:
+                            stream_names.append(task_stream)
         elif job_container_desc:
             log_configs = [job_container_desc.get("logConfiguration", {})]
             stream_name = job_container_desc.get("logStreamName")
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_batch_client.py 
b/providers/amazon/tests/unit/amazon/aws/hooks/test_batch_client.py
index 34b721f54be..80659f07a72 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_batch_client.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_batch_client.py
@@ -400,6 +400,85 @@ class TestBatchClient:
         # all combinations listed above should have been seen
         assert all(combinations.values())
 
+    def test_job_awslogs_multinode_job_task_properties(self):
+        """Test multi-node jobs using taskProperties schema (issue #54254)."""
+        self.client_mock.describe_jobs.return_value = {
+            "jobs": [
+                {
+                    "jobId": JOB_ID,
+                    "attempts": [
+                        {
+                            "taskProperties": [
+                                {
+                                    "containers": [
+                                        {"exitCode": 0, "logStreamName": 
"test/stream/node0"},
+                                        {"exitCode": 0, "logStreamName": 
"test/stream/node1"},
+                                    ]
+                                }
+                            ]
+                        },
+                    ],
+                    "nodeProperties": {
+                        "mainNode": 0,
+                        "nodeRangeProperties": [
+                            {
+                                "targetNodes": "0:",
+                                "container": {
+                                    "logConfiguration": {
+                                        "logDriver": "awslogs",
+                                        "options": {
+                                            "awslogs-group": 
"/test/batch/job-a",
+                                            "awslogs-region": AWS_REGION,
+                                        },
+                                    }
+                                },
+                            },
+                        ],
+                    },
+                }
+            ]
+        }
+        awslogs = self.batch_client.get_job_all_awslogs_info(JOB_ID)
+        assert len(awslogs) == 2
+        assert all(log["awslogs_region"] == AWS_REGION for log in awslogs)
+        stream_names = [log["awslogs_stream_name"] for log in awslogs]
+        assert "test/stream/node0" in stream_names
+        assert "test/stream/node1" in stream_names
+
+    def test_job_awslogs_multinode_no_container_log_stream(self, caplog):
+        """Test multi-node job where attempts[].container has no logStreamName 
(issue #54254)."""
+        self.client_mock.describe_jobs.return_value = {
+            "jobs": [
+                {
+                    "jobId": JOB_ID,
+                    "attempts": [
+                        {"container": {"exitCode": 0}},
+                    ],
+                    "nodeProperties": {
+                        "mainNode": 0,
+                        "nodeRangeProperties": [
+                            {
+                                "targetNodes": "0:",
+                                "container": {
+                                    "logConfiguration": {
+                                        "logDriver": "awslogs",
+                                        "options": {
+                                            "awslogs-group": 
"/test/batch/job-a",
+                                            "awslogs-region": AWS_REGION,
+                                        },
+                                    }
+                                },
+                            },
+                        ],
+                    },
+                }
+            ]
+        }
+        with caplog.at_level(level=logging.WARNING):
+            awslogs = self.batch_client.get_job_all_awslogs_info(JOB_ID)
+            assert awslogs == []
+            assert "doesn't have any AWS CloudWatch Stream" in 
caplog.messages[0]
+
 
 class TestBatchClientDelays:
     @mock.patch.dict("os.environ", AWS_DEFAULT_REGION=AWS_REGION)

Reply via email to