mik-laj commented on issue #44379:
URL: https://github.com/apache/airflow/issues/44379#issuecomment-2783651245
@amoghrajesh Can you provide more details on what you would like such
constraints to look like? From what I can see, we have a few functions that use
code from airflow-core right now. In particular, ariflow.utils and
airflow.exceptions are used a lot.
```python
from pathlib import Path
import ast
def get_imports_from_file(file_path: Path, *, only_top_level: bool = False)
-> list[str]:
"""
Returns list of all imports in file.
Example:
import os
from collections import defaultdict
import numpy as np
from pandas import DataFrame as DF
def inner():
import json
from pathlib import Path
from __future__ import annotations
"""
root = ast.parse(file_path.read_text(), file_path.name)
imports: list[str] = []
nodes = ast.iter_child_nodes(root) if only_top_level else ast.walk(root)
for node in nodes:
if isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name)
elif isinstance(node, ast.ImportFrom):
if node.module == "__future__":
continue
for alias in node.names:
name = alias.name
fullname = f"{node.module}.{name}" if node.module else name
imports.append(fullname)
return imports
# task-sdk/**/*.py
input_files = Path(".").rglob("task-sdk/src/**/*.py")
import json
output = {
str(f): get_imports_from_file(f, only_top_level=True) for f in
input_files
}
output = {
str(k): [
i for i in v if "airflow" in i and "airflow.sdk" not in i
]
for k, v in output.items()
}
output = {
k: v
for k, v in output.items()
if v
}
print(json.dumps(output, indent=2))
```
```json
{
"task-sdk/src/airflow/sdk/bases/operator.py": [
"airflow.exceptions.RemovedInAirflow4Warning",
"airflow.task.priority_strategy.PriorityWeightStrategy",
"airflow.task.priority_strategy.airflow_priority_weight_strategies",
"airflow.task.priority_strategy.validate_and_load_priority_weight_strategy",
"airflow.utils.timezone",
"airflow.utils.setup_teardown.SetupTeardownContext",
"airflow.utils.trigger_rule.TriggerRule",
"airflow.utils.weight_rule.db_safe_priority"
],
"task-sdk/src/airflow/sdk/bases/sensor.py": [
"airflow.configuration.conf",
"airflow.exceptions.AirflowException",
"airflow.exceptions.AirflowFailException",
"airflow.exceptions.AirflowRescheduleException",
"airflow.exceptions.AirflowSensorTimeout",
"airflow.exceptions.AirflowSkipException",
"airflow.exceptions.AirflowTaskTimeout",
"airflow.exceptions.TaskDeferralError",
"airflow.exceptions.TaskDeferralTimeout",
"airflow.utils.timezone"
],
"task-sdk/src/airflow/sdk/bases/notifier.py": [
"airflow.utils.context.context_merge",
"airflow.utils.log.logging_mixin.LoggingMixin"
],
"task-sdk/src/airflow/sdk/api/client.py": [
"airflow.configuration.conf",
"airflow.utils.net.get_hostname",
"airflow.utils.platform.getuser"
],
"task-sdk/src/airflow/sdk/execution_time/xcom.py": [
"airflow.configuration.conf"
],
"task-sdk/src/airflow/sdk/execution_time/supervisor.py": [
"airflow.configuration.conf"
],
"task-sdk/src/airflow/sdk/execution_time/task_runner.py": [
"airflow.dag_processing.bundles.base.BaseDagBundle",
"airflow.dag_processing.bundles.base.BundleVersionLock",
"airflow.dag_processing.bundles.manager.DagBundlesManager",
"airflow.listeners.listener.get_listener_manager",
"airflow.utils.net.get_hostname",
"airflow.utils.state.TaskInstanceState",
"airflow.utils.timezone.coerce_datetime"
],
"task-sdk/src/airflow/sdk/execution_time/secrets_masker.py": [
"airflow.settings"
],
"task-sdk/src/airflow/sdk/definitions/dag.py": [
"airflow.settings",
"airflow.exceptions.DuplicateTaskIdFound",
"airflow.exceptions.FailFastDagInvalidTriggerRule",
"airflow.exceptions.ParamValidationError",
"airflow.exceptions.TaskNotFound",
"airflow.timetables.base.Timetable",
"airflow.timetables.simple.AssetTriggeredTimetable",
"airflow.timetables.simple.ContinuousTimetable",
"airflow.timetables.simple.NullTimetable",
"airflow.timetables.simple.OnceTimetable",
"airflow.utils.dag_cycle_tester.check_cycle",
"airflow.utils.decorators.fixup_decorator_warning_stack",
"airflow.utils.trigger_rule.TriggerRule"
],
"task-sdk/src/airflow/sdk/definitions/mappedoperator.py": [
"airflow.serialization.enums.DagAttributeTypes",
"airflow.task.priority_strategy.PriorityWeightStrategy",
"airflow.task.priority_strategy.validate_and_load_priority_weight_strategy",
"airflow.typing_compat.Literal",
"airflow.utils.helpers.is_container",
"airflow.utils.helpers.prevent_duplicates",
"airflow.utils.xcom.XCOM_RETURN_KEY"
],
"task-sdk/src/airflow/sdk/definitions/connection.py": [
"airflow.exceptions.AirflowException"
],
"task-sdk/src/airflow/sdk/definitions/taskgroup.py": [
"airflow.exceptions.AirflowDagCycleException",
"airflow.exceptions.AirflowException",
"airflow.exceptions.DuplicateTaskIdFound",
"airflow.exceptions.TaskAlreadyInTaskGroup",
"airflow.utils.trigger_rule.TriggerRule"
],
"task-sdk/src/airflow/sdk/definitions/macros.py": [
"airflow.utils.yaml"
],
"task-sdk/src/airflow/sdk/definitions/param.py": [
"airflow.exceptions.AirflowException",
"airflow.exceptions.ParamValidationError",
"airflow.utils.types.NOTSET",
"airflow.utils.types.ArgNotSet"
],
"task-sdk/src/airflow/sdk/definitions/xcom_arg.py": [
"airflow.exceptions.AirflowException",
"airflow.exceptions.XComNotFound",
"airflow.utils.setup_teardown.SetupTeardownContext",
"airflow.utils.trigger_rule.TriggerRule",
"airflow.utils.xcom.XCOM_RETURN_KEY"
],
"task-sdk/src/airflow/sdk/definitions/_internal/templater.py": [
"airflow.utils.helpers.render_template_as_native",
"airflow.utils.helpers.render_template_to_string"
],
"task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py": [
"airflow.utils.setup_teardown.SetupTeardownContext",
"airflow.utils.trigger_rule.TriggerRule",
"airflow.utils.weight_rule.WeightRule"
],
"task-sdk/src/airflow/sdk/definitions/asset/decorators.py": [
"airflow.providers.standard.operators.python.PythonOperator"
],
"task-sdk/src/airflow/sdk/definitions/asset/__init__.py": [
"airflow.serialization.dag_dependency.DagDependency"
]
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]