juditnovak opened a new pull request, #59314:
URL: https://github.com/apache/airflow/pull/59314
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
<!-- Please keep an empty line above the dashes. -->
---
closes: https://github.com/apache/airflow/issues/59074
The issue have been demonstrated on separate branch (including helper code
that is not to be merged): https://github.com/juditnovak/airflow/pull/2
The fix have been demonstrated to respond to the issue (including helper
code that is not to be merged): https://github.com/juditnovak/airflow/pull/1
## Root cause
The `dag.test()` function is using a single session throughout its lifetime.
Since data is cached on the session, outdated results are retrieved.
1. LocalExecutor executes workloads via the
[supervisor](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/supervisor.py)
2. at the end of the task execution, the supervisor is sending an call to
the Airflow API /execution/task-instances/ endpoint. This call is responsible
for updating the database with the final state
4. then the supervisor returns results to the Executor, who will register
the results in its Event Buffer
5. NOTE: 1.)-3.) were part of the Executor's heartbeat()/sync() calls.
6. in the particularly intense dag.test() polling workflow the
SchedulerJobRunner.process_executor_events() call is invoked right after,
rapidly processing contents of the Executor's Event Buffer.
7. however, at this point DB updates from API call from 2.) are not yet
available on the `dag.test()` session. Instead a **cached** record is returned
from an earlier update.
## Testing
- Currently I find not tests within the `task-sdk` targeting `dag.test()`. I
don't know if this may be intentional?
- I find [a single Airflow unit
test](https://github.com/apache/airflow/blob/main/airflow-core/tests/unit/core/test_impersonation_tests.py)
directly relying on the `use_executor=True` option
- it is not the target of the tests, but a tool to perform other tests
- In the supplementary material to this PR I'm providing showcase to the
fix. However since the issue is related to cache expiration, I'm afraid it
would be hard to reproduce with determinisic (i.e. 'non-flaky') tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]