hkc-8010 opened a new issue, #64213:
URL: https://github.com/apache/airflow/issues/64213

   ## Apache Airflow Provider(s)
   amazon
   
   ## Versions of Apache Airflow Providers
   I observed this in a managed Airflow 3.1.8 deployment (Astronomer Runtime 
3.1-14).
   
   I was not able to reliably extract the exact provider package version from 
the running managed image via `kubectl exec`, but the code path involved 
matches current `providers/amazon` source in `main` (currently 
`apache-airflow-providers-amazon 9.24.0`) and the runtime image definitely 
includes the Task SDK fallback added for triggerer connection access.
   
   ## Apache Airflow version
   3.1.8
   
   ## Operating System
   Debian GNU/Linux 12 (bookworm)
   
   ## Deployment
   Astronomer
   
   ## Deployment details
   Managed Astronomer deployment using triggerer + deferrable operators.
   
   The failing operator path was a deferrable `S3KeySensor` / `S3KeyTrigger` 
using an AWS Airflow connection with `extra` containing `role_arn` and 
`region_name`.
   
   The same connection works in non-deferrable/reschedule mode and also 
resolves correctly through the triggerer supervisor's in-process execution API 
client.
   
   ## What happened
   A deferrable `S3KeySensor` worked until it deferred. Once execution moved 
into the triggerer path, the AWS hook behaved as if the configured Airflow 
connection did not exist, logged:
   
   ```text
   Unable to find AWS Connection ID 'aws_quadrant', switching to empty.
   No connection ID provided. Fallback on boto3 credential strategy 
(region_name=None). If you have boto3 credentials configured, they will be used 
directly.
   ```
   
   and then failed against S3 with:
   
   ```text
   An error occurred (403) when calling the HeadObject operation: Forbidden
   ```
   
   Important detail: the Airflow connection itself was valid. Worker-side 
execution could use it, and the triggerer supervisor's in-process API client 
could also fetch it successfully.
   
   After tracing the triggerer-side code path, the failure appears to happen 
during sync connection lookup from the AWS hook.
   
   The relevant path is:
   
   - `S3KeyTrigger.run()` -> `await self.hook.get_async_conn()`
   - `AwsBaseHook.get_async_conn()` wraps `_get_async_conn()` in 
`sync_to_async(...)`
   - `_get_async_conn()` calls `self.get_client_type(...)`
   - `self.region_name` / `self.conn_config` call `self.get_connection(...)`
   - Task SDK's `ExecutionAPISecretsBackend.get_connection()` uses 
`SUPERVISOR_COMMS.send(GetConnection(...))`
   - In triggerer context, `SUPERVISOR_COMMS` is a `TriggerCommsDecoder`
   
   What I found is that `TriggerCommsDecoder.send()` can raise this runtime 
error when called from the worker thread created by `sync_to_async`:
   
   ```text
   RuntimeError: Task <Task pending name='Task-3' 
coro=<AsyncToSync.__call__.<locals>.new_loop_wrap() running at ...>> got Future 
<Future pending> attached to a different loop
   ```
   
   `ExecutionAPISecretsBackend.get_connection()` currently special-cases only 
one `RuntimeError` message:
   
   ```python
   "You cannot use AsyncToSync in the same thread as an async event loop"
   ```
   
   When it gets the different-loop error above, it falls through to the generic 
exception handler and returns `None`. That makes `AwsBaseHook.conn_config` 
think the connection is missing, which triggers the warning and 
empty-credentials fallback.
   
   ## What you think should happen instead
   Deferrable AWS hooks in triggerer context should resolve the same Airflow 
connection that non-deferrable execution resolves.
   
   At minimum, the triggerer-side connection lookup should not silently degrade 
to `None` when `TriggerCommsDecoder.send()` fails with the different-loop 
`RuntimeError` shown above.
   
   Instead, one of these should happen:
   
   1. The sync triggerer connection path should work correctly from the 
`sync_to_async` worker thread.
   2. The AWS hook / Task SDK path should use the async connection getter in 
triggerer contexts.
   3. The Task SDK fallback should handle this different-loop runtime error as 
well, rather than returning `None` and causing silent credential fallback.
   
   ## How to reproduce
   I have two reproductions.
   
   ### End-user reproduction
   1. Create an AWS Airflow connection whose `extra` contains a `role_arn` and 
`region_name`.
   2. Use that connection in a deferrable `S3KeySensor` / `S3KeyTrigger`.
   3. Make sure the worker-side path can defer successfully.
   4. Once the triggerer resumes polling, the triggerer logs that it cannot 
find the AWS connection and falls back to empty boto3 credentials.
   5. S3 calls then fail with `403 Forbidden` because the role assumption 
information from the Airflow connection was lost.
   
   This did **not** happen when the same sensor was switched back to 
non-deferrable / reschedule mode.
   
   ### Minimal triggerer-side code-path reproduction
   In a triggerer-like context where:
   
   - `task_runner.SUPERVISOR_COMMS` is a `TriggerCommsDecoder`
   - the supervisor side can successfully answer `GetConnection(conn_id)` using 
the in-process execution API client
   - the AWS hook resolves its connection via the sync path from inside 
`sync_to_async`
   
   calling:
   
   ```python
   hook = S3Hook(aws_conn_id="aws_quadrant")
   role, region = await asyncio.to_thread(lambda: (hook.conn_config.role_arn, 
hook.conn_config.region_name))
   ```
   
   can reproduce the customer-facing warning:
   
   ```text
   Unable to find AWS Connection ID 'aws_quadrant', switching to empty.
   ```
   
   and yields:
   
   ```text
   role None
   region None
   ```
   
   while a lower-level repro of `TriggerCommsDecoder.send(GetConnection(...))` 
from the worker thread surfaces:
   
   ```text
   RuntimeError: ... got Future <Future pending> attached to a different loop
   ```
   
   ## Anything else
   A few notes that may help narrow this down:
   
   - The triggerer supervisor's in-process execution API client can fetch the 
connection successfully.
   - The Airflow connection record itself is valid and includes the expected 
`extra` payload.
   - This does not look like a serialization issue in `ConnectionResponse` / 
`ConnectionResult`.
   - This also does not look like simply "missing #57154"; the runtime image I 
checked already included the Task SDK fallback for the `AsyncToSync` event-loop 
error. The problem is that the actual runtime error here is a different one 
(`Future attached to a different loop`), so the current special-case logic does 
not catch it.
   
   If useful, I can work on a PR or help reduce this further into a standalone 
unit/integration test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to