juditnovak opened a new pull request, #59314:
URL: https://github.com/apache/airflow/pull/59314

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   
   ---
   closes: https://github.com/apache/airflow/issues/59074
   
   The issue have been demonstrated on separate branch (including helper code 
that is not to be merged): https://github.com/juditnovak/airflow/pull/2
   
   The fix have been demonstrated to respond to the issue (including helper 
code that is not to be merged): https://github.com/juditnovak/airflow/pull/1
   
   ## Root cause
   
   The `dag.test()` function is using a single session throughout its lifetime. 
Since data is cached on the session, outdated results are retrieved.
   
   1. LocalExecutor executes workloads via the 
[supervisor](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/supervisor.py)
   2. at the end of the task execution, the supervisor is sending an call to 
the Airflow API /execution/task-instances/ endpoint. This call is responsible 
for updating the database with the final state
   4. then the supervisor returns results to the Executor, who will register 
the results in its Event Buffer
   5. NOTE: 1.)-3.) were part of the Executor's heartbeat()/sync() calls.
   6. in the particularly intense dag.test() polling workflow the 
SchedulerJobRunner.process_executor_events() call is invoked right  after, 
rapidly processing contents of the Executor's Event Buffer.
   7. however, at this point DB updates from API call from 2.) are not yet 
available on the `dag.test()` session. Instead a **cached** record is returned 
from an earlier update.
   
   ## Testing
   
   - Currently I find not tests within the `task-sdk` targeting `dag.test()`. I 
don't know if this may be intentional?
   - I find [a single Airflow unit 
test](https://github.com/apache/airflow/blob/main/airflow-core/tests/unit/core/test_impersonation_tests.py)
 directly relying on the `use_executor=True` option
     - it is not the target of the tests, but a tool to perform other tests
   - In the supplementary material to this PR I'm providing showcase to the 
fix. However since the issue is related to cache expiration, I'm afraid it 
would be hard to reproduce with determinisic (i.e. 'non-flaky') tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to