potiuk commented on code in PR #52091:
URL: https://github.com/apache/airflow/pull/52091#discussion_r2428825146
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1134,5 +1136,54 @@ def get_trigger_by_classpath(self, classpath: str) ->
type[BaseTrigger]:
Uses a cache dictionary to speed up lookups after the first time.
"""
if classpath not in self.trigger_cache:
- self.trigger_cache[classpath] = import_string(classpath)
+ self.trigger_cache[classpath] =
self.import_classpath_maybe_zip(classpath)
return self.trigger_cache[classpath]
+
+ def clear_sys_modules_from_zip_info(self, zip_file):
+ """
+ Clear the import cache (sys.modules) of the modules found in zip_file.
+
+ This is done so that triggers from different versions of the same dag,
with
+ different versions of the same library are imported correctly.
+
+ If this is not performed then the triggerer process can (will) load
+ the wrong version of the library from the sys.modules cache.
+ """
+ with zipfile.ZipFile(zip_file) as zf:
+ for zip_info in zf.infolist():
+ p = Path(zip_info.filename)
+ if p.suffix == ".py":
+ d = os.path.dirname(p)
+ if d:
+ module1 = d.replace(os.sep, ".")
+ module2 = f"{module1}.{p.stem}"
+ if sys.modules.pop(module1, None):
+ self.log.info("Removed %s from sys.modules",
module1)
+ if sys.modules.pop(module2, None):
+ self.log.info("Removed %s from sys.modules",
module2)
+ else:
+ if sys.modules.pop(p.stem, None):
+ self.log.info("Removed %s from sys.modules",
p.stem)
+
+ def import_classpath_maybe_zip(self, classpath):
+ """
+ Get a trigger class by its classpath.
+
+ classpath could be of the following forms:
+ 1) path.to.my.class
+ 2) /path/to/dags/dag.zip:path.to.my.class
+ """
+ if classpath.find(":") >= 0 and ".zip" in classpath:
+ [zip_file, classname] = classpath.split(":")
+
+ self.clear_sys_modules_from_zip_info(zip_file)
+
+ try:
+ sys.path.insert(0, zip_file)
+ return import_string(classname)
+ except Exception:
+ raise
Review Comment:
```suggestion
```
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1134,5 +1136,54 @@ def get_trigger_by_classpath(self, classpath: str) ->
type[BaseTrigger]:
Uses a cache dictionary to speed up lookups after the first time.
"""
if classpath not in self.trigger_cache:
- self.trigger_cache[classpath] = import_string(classpath)
+ self.trigger_cache[classpath] =
self.import_classpath_maybe_zip(classpath)
return self.trigger_cache[classpath]
+
+ def clear_sys_modules_from_zip_info(self, zip_file):
+ """
+ Clear the import cache (sys.modules) of the modules found in zip_file.
+
+ This is done so that triggers from different versions of the same dag,
with
+ different versions of the same library are imported correctly.
+
+ If this is not performed then the triggerer process can (will) load
+ the wrong version of the library from the sys.modules cache.
+ """
+ with zipfile.ZipFile(zip_file) as zf:
+ for zip_info in zf.infolist():
+ p = Path(zip_info.filename)
+ if p.suffix == ".py":
+ d = os.path.dirname(p)
+ if d:
+ module1 = d.replace(os.sep, ".")
+ module2 = f"{module1}.{p.stem}"
+ if sys.modules.pop(module1, None):
+ self.log.info("Removed %s from sys.modules",
module1)
+ if sys.modules.pop(module2, None):
+ self.log.info("Removed %s from sys.modules",
module2)
+ else:
+ if sys.modules.pop(p.stem, None):
+ self.log.info("Removed %s from sys.modules",
p.stem)
+
+ def import_classpath_maybe_zip(self, classpath):
+ """
+ Get a trigger class by its classpath.
+
+ classpath could be of the following forms:
+ 1) path.to.my.class
+ 2) /path/to/dags/dag.zip:path.to.my.class
+ """
+ if classpath.find(":") >= 0 and ".zip" in classpath:
+ [zip_file, classname] = classpath.split(":")
+
+ self.clear_sys_modules_from_zip_info(zip_file)
+
+ try:
+ sys.path.insert(0, zip_file)
+ return import_string(classname)
+ except Exception:
+ raise
Review Comment:
Not needed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]