mik-laj commented on issue #44379:
URL: https://github.com/apache/airflow/issues/44379#issuecomment-2783651245

   @amoghrajesh Can you provide more details on what you would like such 
constraints to look like? From what I can see, we have a few functions that use 
code from airflow-core right now. In particular, ariflow.utils and 
airflow.exceptions are used a lot.
   
   ```python
   from pathlib import Path
   import ast
   
   def get_imports_from_file(file_path: Path, *, only_top_level: bool = False) 
-> list[str]:
       """
       Returns list of all imports in file.
   
       Example:
       import os
       from collections import defaultdict
       import numpy as np
       from pandas import DataFrame as DF
   
       def inner():
           import json
           from pathlib import Path
       from __future__ import annotations
           
       """
       root = ast.parse(file_path.read_text(), file_path.name)
       imports: list[str] = []
   
       nodes = ast.iter_child_nodes(root) if only_top_level else ast.walk(root)
       for node in nodes:
           if isinstance(node, ast.Import):
               for alias in node.names:
                   imports.append(alias.name)
           elif isinstance(node, ast.ImportFrom):
               if node.module == "__future__":
                   continue
               for alias in node.names:
                   name = alias.name
                   fullname = f"{node.module}.{name}" if node.module else name
                   imports.append(fullname)
   
       return imports
   
   
   # task-sdk/**/*.py
   input_files = Path(".").rglob("task-sdk/src/**/*.py")
   
   import json
   output = {
       str(f): get_imports_from_file(f, only_top_level=True) for f in 
input_files
   }
   output = {
       str(k): [
           i for i in v if "airflow" in i and "airflow.sdk" not in i
       ]
       for k, v in output.items()
   }
   output = {
       k: v 
       for k, v in output.items()
       if v
   }
   
   print(json.dumps(output, indent=2))
   
   
   ```
   ```json
   {
     "task-sdk/src/airflow/sdk/bases/operator.py": [
       "airflow.exceptions.RemovedInAirflow4Warning",
       "airflow.task.priority_strategy.PriorityWeightStrategy",
       "airflow.task.priority_strategy.airflow_priority_weight_strategies",
       
"airflow.task.priority_strategy.validate_and_load_priority_weight_strategy",
       "airflow.utils.timezone",
       "airflow.utils.setup_teardown.SetupTeardownContext",
       "airflow.utils.trigger_rule.TriggerRule",
       "airflow.utils.weight_rule.db_safe_priority"
     ],
     "task-sdk/src/airflow/sdk/bases/sensor.py": [
       "airflow.configuration.conf",
       "airflow.exceptions.AirflowException",
       "airflow.exceptions.AirflowFailException",
       "airflow.exceptions.AirflowRescheduleException",
       "airflow.exceptions.AirflowSensorTimeout",
       "airflow.exceptions.AirflowSkipException",
       "airflow.exceptions.AirflowTaskTimeout",
       "airflow.exceptions.TaskDeferralError",
       "airflow.exceptions.TaskDeferralTimeout",
       "airflow.utils.timezone"
     ],
     "task-sdk/src/airflow/sdk/bases/notifier.py": [
       "airflow.utils.context.context_merge",
       "airflow.utils.log.logging_mixin.LoggingMixin"
     ],
     "task-sdk/src/airflow/sdk/api/client.py": [
       "airflow.configuration.conf",
       "airflow.utils.net.get_hostname",
       "airflow.utils.platform.getuser"
     ],
     "task-sdk/src/airflow/sdk/execution_time/xcom.py": [
       "airflow.configuration.conf"
     ],
     "task-sdk/src/airflow/sdk/execution_time/supervisor.py": [
       "airflow.configuration.conf"
     ],
     "task-sdk/src/airflow/sdk/execution_time/task_runner.py": [
       "airflow.dag_processing.bundles.base.BaseDagBundle",
       "airflow.dag_processing.bundles.base.BundleVersionLock",
       "airflow.dag_processing.bundles.manager.DagBundlesManager",
       "airflow.listeners.listener.get_listener_manager",
       "airflow.utils.net.get_hostname",
       "airflow.utils.state.TaskInstanceState",
       "airflow.utils.timezone.coerce_datetime"
     ],
     "task-sdk/src/airflow/sdk/execution_time/secrets_masker.py": [
       "airflow.settings"
     ],
     "task-sdk/src/airflow/sdk/definitions/dag.py": [
       "airflow.settings",
       "airflow.exceptions.DuplicateTaskIdFound",
       "airflow.exceptions.FailFastDagInvalidTriggerRule",
       "airflow.exceptions.ParamValidationError",
       "airflow.exceptions.TaskNotFound",
       "airflow.timetables.base.Timetable",
       "airflow.timetables.simple.AssetTriggeredTimetable",
       "airflow.timetables.simple.ContinuousTimetable",
       "airflow.timetables.simple.NullTimetable",
       "airflow.timetables.simple.OnceTimetable",
       "airflow.utils.dag_cycle_tester.check_cycle",
       "airflow.utils.decorators.fixup_decorator_warning_stack",
       "airflow.utils.trigger_rule.TriggerRule"
     ],
     "task-sdk/src/airflow/sdk/definitions/mappedoperator.py": [
       "airflow.serialization.enums.DagAttributeTypes",
       "airflow.task.priority_strategy.PriorityWeightStrategy",
       
"airflow.task.priority_strategy.validate_and_load_priority_weight_strategy",
       "airflow.typing_compat.Literal",
       "airflow.utils.helpers.is_container",
       "airflow.utils.helpers.prevent_duplicates",
       "airflow.utils.xcom.XCOM_RETURN_KEY"
     ],
     "task-sdk/src/airflow/sdk/definitions/connection.py": [
       "airflow.exceptions.AirflowException"
     ],
     "task-sdk/src/airflow/sdk/definitions/taskgroup.py": [
       "airflow.exceptions.AirflowDagCycleException",
       "airflow.exceptions.AirflowException",
       "airflow.exceptions.DuplicateTaskIdFound",
       "airflow.exceptions.TaskAlreadyInTaskGroup",
       "airflow.utils.trigger_rule.TriggerRule"
     ],
     "task-sdk/src/airflow/sdk/definitions/macros.py": [
       "airflow.utils.yaml"
     ],
     "task-sdk/src/airflow/sdk/definitions/param.py": [
       "airflow.exceptions.AirflowException",
       "airflow.exceptions.ParamValidationError",
       "airflow.utils.types.NOTSET",
       "airflow.utils.types.ArgNotSet"
     ],
     "task-sdk/src/airflow/sdk/definitions/xcom_arg.py": [
       "airflow.exceptions.AirflowException",
       "airflow.exceptions.XComNotFound",
       "airflow.utils.setup_teardown.SetupTeardownContext",
       "airflow.utils.trigger_rule.TriggerRule",
       "airflow.utils.xcom.XCOM_RETURN_KEY"
     ],
     "task-sdk/src/airflow/sdk/definitions/_internal/templater.py": [
       "airflow.utils.helpers.render_template_as_native",
       "airflow.utils.helpers.render_template_to_string"
     ],
     "task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py": [
       "airflow.utils.setup_teardown.SetupTeardownContext",
       "airflow.utils.trigger_rule.TriggerRule",
       "airflow.utils.weight_rule.WeightRule"
     ],
     "task-sdk/src/airflow/sdk/definitions/asset/decorators.py": [
       "airflow.providers.standard.operators.python.PythonOperator"
     ],
     "task-sdk/src/airflow/sdk/definitions/asset/__init__.py": [
       "airflow.serialization.dag_dependency.DagDependency"
     ]
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to