o-nikolas commented on code in PR #64342:
URL: https://github.com/apache/airflow/pull/64342#discussion_r3048658617


##########
providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py:
##########
@@ -111,6 +113,238 @@ def test_serialization(self):
             "waiter_delay": 10,
         }
 
+    def test_serialization_verbose(self):
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="JobRunId",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_max_attempts=3,
+            waiter_delay=10,
+        )
+        classpath, kwargs = trigger.serialize()
+        assert kwargs["verbose"] is True
+
+    @pytest.mark.asyncio
+    @mock.patch.object(AwsLogsHook, "get_async_conn")
+    @mock.patch.object(GlueJobHook, "get_async_conn")
+    async def test_verbose_run_success(self, mock_glue_conn, mock_logs_conn):
+        """When verbose=True, the trigger polls job state and fetches 
CloudWatch logs."""
+        glue_client = AsyncMock()
+        glue_client.get_job_run = AsyncMock(
+            side_effect=[
+                # First call: metadata + initial state (RUNNING)
+                {"JobRun": {"JobRunState": "RUNNING", "LogGroupName": 
"/aws-glue/python-jobs"}},
+                # Second call: state update after sleep (SUCCEEDED)
+                {"JobRun": {"JobRunState": "SUCCEEDED", "LogGroupName": 
"/aws-glue/python-jobs"}},
+            ]
+        )
+        mock_glue_conn.return_value.__aenter__ = 
AsyncMock(return_value=glue_client)
+        mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(
+            return_value={
+                "events": [{"timestamp": 1234, "message": "Processing step 
1\n"}],
+                "nextForwardToken": "token_1",
+            }
+        )
+        mock_logs_conn.return_value.__aenter__ = 
AsyncMock(return_value=logs_client)
+        mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_delay=0,
+            waiter_max_attempts=5,
+        )
+        generator = trigger.run()
+        event = await generator.asend(None)
+
+        assert event.payload["status"] == "success"
+        assert event.payload["run_id"] == "jr_123"
+        # Logs client was called for both output and error streams
+        assert logs_client.get_log_events.call_count >= 2
+
+    @pytest.mark.asyncio
+    @mock.patch.object(AwsLogsHook, "get_async_conn")
+    @mock.patch.object(GlueJobHook, "get_async_conn")
+    async def test_verbose_run_job_failed(self, mock_glue_conn, 
mock_logs_conn):
+        """When verbose=True and the job fails, the trigger yields an error 
event."""
+        glue_client = AsyncMock()
+        glue_client.get_job_run = AsyncMock(
+            return_value={"JobRun": {"JobRunState": "FAILED", "LogGroupName": 
"/aws-glue/python-jobs"}}
+        )
+        mock_glue_conn.return_value.__aenter__ = 
AsyncMock(return_value=glue_client)
+        mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(return_value={"events": [], 
"nextForwardToken": "token_1"})
+        mock_logs_conn.return_value.__aenter__ = 
AsyncMock(return_value=logs_client)
+        mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_delay=0,
+            waiter_max_attempts=5,
+        )
+        generator = trigger.run()
+        event = await generator.asend(None)
+        assert event.payload["status"] == "error"
+        assert "FAILED" in event.payload["message"]
+        assert event.payload["run_id"] == "jr_123"
+
+    @pytest.mark.asyncio
+    @mock.patch.object(AwsLogsHook, "get_async_conn")
+    @mock.patch.object(GlueJobHook, "get_async_conn")
+    async def test_verbose_run_max_attempts(self, mock_glue_conn, 
mock_logs_conn):
+        """When verbose=True and the job stays RUNNING past max attempts, 
yields an error event."""
+        glue_client = AsyncMock()
+        glue_client.get_job_run = AsyncMock(
+            return_value={"JobRun": {"JobRunState": "RUNNING", "LogGroupName": 
"/aws-glue/python-jobs"}}
+        )
+        mock_glue_conn.return_value.__aenter__ = 
AsyncMock(return_value=glue_client)
+        mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(return_value={"events": [], 
"nextForwardToken": "token_1"})
+        mock_logs_conn.return_value.__aenter__ = 
AsyncMock(return_value=logs_client)
+        mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_delay=0,
+            waiter_max_attempts=2,
+        )
+        generator = trigger.run()
+        event = await generator.asend(None)
+        assert event.payload["status"] == "error"
+        assert "max attempts" in event.payload["message"]
+        assert event.payload["run_id"] == "jr_123"
+
+    @pytest.mark.asyncio
+    @mock.patch.object(AwsLogsHook, "get_async_conn")
+    @mock.patch.object(GlueJobHook, "get_async_conn")
+    async def test_verbose_run_cloudwatch_client_error(self, mock_glue_conn, 
mock_logs_conn):
+        """When verbose=True and CloudWatch returns an unexpected ClientError, 
yields error event."""
+        glue_client = AsyncMock()
+        glue_client.get_job_run = AsyncMock(
+            return_value={"JobRun": {"JobRunState": "RUNNING", "LogGroupName": 
"/aws-glue/python-jobs"}}
+        )
+        mock_glue_conn.return_value.__aenter__ = 
AsyncMock(return_value=glue_client)
+        mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(
+            side_effect=ClientError(
+                {"Error": {"Code": "AccessDeniedException", "Message": "not 
authorized"}},
+                "GetLogEvents",
+            )
+        )
+        mock_logs_conn.return_value.__aenter__ = 
AsyncMock(return_value=logs_client)
+        mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            waiter_delay=0,
+            waiter_max_attempts=5,
+        )
+        generator = trigger.run()
+        event = await generator.asend(None)
+        assert event.payload["status"] == "error"
+        assert "Failed to fetch logs" in event.payload["message"]
+        assert "AccessDeniedException" in event.payload["message"]
+        assert event.payload["run_id"] == "jr_123"
+
+    @pytest.mark.asyncio
+    async def test_forward_logs_resource_not_found(self):
+        """_forward_logs handles ResourceNotFoundException gracefully."""
+        logs_client = AsyncMock()
+        logs_client.get_log_events = AsyncMock(
+            side_effect=ClientError(
+                {"Error": {"Code": "ResourceNotFoundException", "Message": 
"not found"}},
+                "GetLogEvents",
+            )
+        )
+
+        trigger = GlueJobCompleteTrigger(
+            job_name="job_name",
+            run_id="jr_123",
+            verbose=True,
+            aws_conn_id="aws_conn_id",
+            region_name="us-east-1",
+            waiter_delay=0,
+            waiter_max_attempts=5,
+        )
+        result = await trigger._forward_logs(logs_client, 
"/aws-glue/python-jobs/output", "jr_123", None)
+        assert result is None
+
+    @pytest.mark.asyncio
+    async def test_forward_logs_pagination(self, caplog):
+        """_forward_logs follows nextForwardToken and formats logs like the 
sync path."""

Review Comment:
   Also try to mock the logger itself wherever possible. Last I heard we try to 
avoid caplog in the Airflow community
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to