StrawberryOwl opened a new issue, #46376: URL: https://github.com/apache/airflow/issues/46376
### Apache Airflow Provider(s) apache-kafka ### Versions of Apache Airflow Providers apache-airflow-providers-apache-kafka==1.7.0 ### Apache Airflow version 2.10.4 ### Operating System Debian GNU/Linux 12 (bookworm) ### Deployment Docker-Compose ### Deployment details Used "Docker version 27.4.0, build bde2b89", with official docker-compose from [airflow-website](https://airflow.apache.org/docs/apache-airflow/2.10.4/docker-compose.yaml) as basis. Some changes to install "apache-airflow[apache-kafka]" and include Kafka containers into compose-file. ### What happened I am designing a workflow sending and receiving Kafka messages. For reception I am using AwaitMessageSensor where you have to pass an apply_function in dot-sting-notation as used by importlib. Directly after startup of the airflow system, everything works fine, but when the code of apply_function is changed, these changes are not reflected by the trigger actually being executed. The problem gets highlighted, if the function is renamed. The trigger crashes with an ImprtError "Module "myDAG" does not define a "my_apply_function" attribute/class", although my_apply_function is defined in the currently used DAG-file. I verified that the most recent version of the file is correctly synced to the triggerer-container (mounted into the container as per default) and it can also be found in the database. ### What you think should happen instead Changing the apply function should be correctly reflected by the triggerer. ### How to reproduce Take a working Kafka-Airflow-Setup, use an AwaitMessageSensor, rename its apply_function. (I think this problem is quite general, so I did not provide code, if someone really requires my docker-compose and a minimal DAG, I could provide it later). ### Anything else I researched quite a while, to come to anything near a solution. The TriggererRunner uses a cache to get triggerer classes by their function name (see [source code](https://github.com/apache/airflow/blob/8a3757b99e247d2f09a86d12e6aff8d525a3f3e7/airflow/jobs/triggerer_job_runner.py#L759)). I read #31743 which has been closed after [updates](https://github.com/apache/airflow/issues/31743#issuecomment-1598786020) to the Documentation of deferrable operators and also digged into the source code of AwaitMessageSensor and AwaitMessageTrigger where the string passed via apply_function is imported by importlib. So I expected my new code to be imported each time, I used the sensor. My understanding of this problem is far from complete, but it seems to me that my previous expectation is not met, because the TriggererRunner caches the triggerers and therefore the import of my apply_function by importlib is only performed once. If that is not true, please correct me. I propose that at least the documentation should be updated, explicitly pointing out that changing apply_function requires a restart of the triggerer. The aforementioned changes where only in the section about writing your own triggerers, which I did not take into account, since I am not writing my own triggerer, but only using one from an offical provider. The way it is working now is very unexpected and cost me a lot of time figuring out. But from a user perspective it would be much more convenient, if the apply_function could be updated whenever code is updated across all containers, e.g. by sidestepping the caching either with the whole AwaitMessageTrigger or at least the apply_function. ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
