Copilot commented on code in PR #64829:
URL: https://github.com/apache/airflow/pull/64829#discussion_r3066477999


##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py:
##########
@@ -153,6 +154,47 @@ def setup_method(self):
             callbacks=[MockKubernetesPodOperatorCallback],
         )
 
+    @pytest.mark.parametrize(
+        ("message", "expected_level"),
+        [
+            ("ERROR: something went wrong", logging.ERROR),
+            ("WARNING: low disk space", logging.WARNING),
+            ("WARN: deprecated usage", logging.WARNING),
+            ("DEBUG: entering function", logging.DEBUG),
+            ("CRITICAL: system failure", logging.CRITICAL),
+            ("FATAL: unrecoverable error", logging.CRITICAL),
+            ("INFO: starting up", logging.INFO),
+            ("[ERROR] bracketed prefix", logging.ERROR),
+            ("plain log line with no level", logging.INFO),
+            ("", logging.INFO),
+        ],
+    )
+    def test_parse_log_level(self, message, expected_level):
+        assert _parse_log_level(message) == expected_level
+
+    def test_log_message_uses_detected_log_level(self):
+        """_log_message should forward ERROR lines at ERROR level, not INFO."""
+        with mock.patch.object(self.pod_manager.log, "log") as mock_log:
+            self.pod_manager._log_message(
+                message="ERROR: something failed",
+                container_name="base",
+                container_name_log_prefix_enabled=True,
+                log_formatter=None,
+            )
+        mock_log.assert_called_once()
+        assert mock_log.call_args[0][0] == logging.ERROR
+
+    def test_log_message_defaults_to_info_for_plain_lines(self):
+        """_log_message should use INFO for lines without a known level 
prefix."""
+        with mock.patch.object(self.pod_manager.log, "log") as mock_log:
+            self.pod_manager._log_message(
+                message="Starting application",
+                container_name="base",
+                container_name_log_prefix_enabled=True,
+                log_formatter=None,
+            )
+        assert mock_log.call_args[0][0] == logging.INFO

Review Comment:
   `test_log_message_defaults_to_info_for_plain_lines` patches 
`self.pod_manager.log.log`, but `_log_message()` calls `self.log.info(...)` for 
INFO-level lines, so the patched method is never invoked and 
`mock_log.call_args` will be unset. Patch `self.pod_manager.log.info` instead, 
or change `_log_message()` to always call `self.log.log(level, ...)` (including 
INFO) so the test can consistently intercept the call.
   ```suggestion
           with mock.patch.object(self.pod_manager.log, "info") as mock_info:
               self.pod_manager._log_message(
                   message="Starting application",
                   container_name="base",
                   container_name_log_prefix_enabled=True,
                   log_formatter=None,
               )
           mock_info.assert_called_once()
   ```



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -436,18 +465,21 @@ def _log_message(
         container_name_log_prefix_enabled: bool,
         log_formatter: Callable[[str, str], str] | None,
     ) -> None:
-        """Log a message with appropriate formatting."""
+        """Log a message at the level detected from its prefix, with 
appropriate formatting."""
         if is_log_group_marker(message):
             print(message)
         else:
+            level = _parse_log_level(message)
             if log_formatter:
                 formatted_message = log_formatter(container_name, message)
-                self.log.info("%s", formatted_message)
             else:
-                log_message = (
+                formatted_message = (
                     f"[{container_name}] {message}" if 
container_name_log_prefix_enabled else message
                 )
-                self.log.info("%s", log_message)
+            if level == logging.INFO:
+                self.log.info("%s", formatted_message)
+            else:
+                self.log.log(level, "%s", formatted_message)

Review Comment:
   `_log_message()` branches to `self.log.info(...)` for INFO but uses 
`self.log.log(level, ...)` for other levels. Since `Logger.log(logging.INFO, 
...)` is equivalent, consider always calling `self.log.log(level, ...)` to 
simplify the code and make it easier to test/intercept consistently (current 
unit tests patch `.log`).
   ```suggestion
               self.log.log(level, "%s", formatted_message)
   ```



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -74,6 +76,33 @@
 :meta private:
 """
 
+_POD_LOG_LEVEL_PATTERN = re.compile(
+    
r"^\s*(?:\[)?(DEBUG|INFO|WARNING|WARN|ERROR|CRITICAL|FATAL)(?:\])?\s*[:\-]?\s*",
+    re.IGNORECASE,
+)
+_POD_LOG_LEVEL_MAP: dict[str, int] = {
+    "DEBUG": logging.DEBUG,
+    "INFO": logging.INFO,
+    "WARNING": logging.WARNING,
+    "WARN": logging.WARNING,
+    "ERROR": logging.ERROR,
+    "CRITICAL": logging.CRITICAL,
+    "FATAL": logging.CRITICAL,
+}
+
+
+def _parse_log_level(message: str) -> int:
+    """
+    Detect the Python logging level from a pod log line's prefix.
+
+    Recognises common formats: ``ERROR:``, ``[ERROR]``, ``WARNING -``, etc.
+    Returns ``logging.INFO`` when no known prefix is found 
(backwards-compatible).
+    """
+    match = _POD_LOG_LEVEL_PATTERN.match(message)
+    if match:
+        return _POD_LOG_LEVEL_MAP.get(match.group(1).upper(), logging.INFO)
+    return logging.INFO

Review Comment:
   PR description says pod log levels are forwarded, but only 
`PodManager._log_message()` (sync path) is updated to detect levels. 
`AsyncPodManager.fetch_container_logs_before_current_sec()` still logs 
everything via `self.log.info(...)`, so deferrable/trigger-based log streaming 
will remain INFO-only unless it also uses the same level parsing logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to