amoghrajesh commented on code in PR #58825:
URL: https://github.com/apache/airflow/pull/58825#discussion_r2579879617
##########
airflow-core/src/airflow/settings.py:
##########
@@ -636,6 +636,7 @@ def _configure_secrets_masker():
core_masker.sensitive_variables_fields = list(sensitive_fields)
core_masker.secret_mask_adapter = secret_mask_adapter
+ # TODO: this should be moved out when settings are moved to `shared`
Review Comment:
I dont think I follow this one
##########
airflow-core/src/airflow/serialization/serde.py:
##########
@@ -31,10 +31,10 @@
import attr
import airflow.serialization.serializers
+from airflow._shared.module_loading import import_string, iter_namespace,
qualname
Review Comment:
Note to self: revisit this occurence as part of [Move over serde library to
task sdk](https://github.com/apache/airflow/issues/58885)
##########
shared/module_loading/src/airflow_shared/module_loading/__init__.py:
##########
@@ -14,21 +15,24 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
from __future__ import annotations
+import pkgutil
from collections.abc import Callable
from importlib import import_module
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from types import ModuleType
def import_string(dotted_path: str):
"""
Import a dotted module path and return the attribute/class designated by
the last name in the path.
- Note: Only supports top-level attributes or classes.
-
Raise ImportError if the import failed.
"""
+ # TODO: Add support for nested classes. Currently, it only works for
top-level classes.
Review Comment:
Do we need to? I dont see a use case for it
##########
shared/configuration/src/airflow_shared/configuration/parser.py:
##########
@@ -1149,7 +1149,7 @@ def getimport(self, section: str, key: str, **kwargs) ->
Any:
try:
# Import here to avoid circular dependency
- from airflow.utils.module_loading import import_string
+ from ..module_loading import import_string
Review Comment:
Nice
##########
scripts/ci/prek/check_shared_distributions_structure.py:
##########
@@ -202,20 +372,217 @@ def check_shared_distribution(shared_path: Path) -> bool:
return False
if not check_ruff_lint_rules(ruff, shared_path):
return False
+ if not check_no_airflow_dependencies(pyproject, shared_path):
+ return False
+ if not check_no_airflow_imports(shared_path):
+ return False
console.print(f"[bold green]Summary: {shared_path.name} is OK[/bold
green]")
return True
+def check_no_airflow_shared_imports(dist_path: Path, dist_name: str) -> bool:
+ """Check that no Python files use airflow_shared imports."""
+ src_path = dist_path / "src"
+ if not src_path.exists():
+ console.print(f" [yellow]src/ directory does not exist for
[magenta]{dist_name}[/magenta][/yellow]")
+ return True
+
+ def airflow_shared_import_predicate(node, module_name, is_from_import):
+ """Check if import is from airflow_shared package."""
+ if module_name == "airflow_shared" or
module_name.startswith("airflow_shared."):
+ if is_from_import:
+ imported_names = ", ".join(alias.name for alias in node.names)
+ return True, f"from {module_name} import {imported_names}"
+ return True, f"import {module_name}"
+ return False, ""
+
+ py_files = list(src_path.rglob("*.py"))
+ violations = _check_imports_in_files(py_files, dist_path,
airflow_shared_import_predicate, dist_name)
+
+ if violations:
+ console.print(f" [red]Found airflow_shared imports in
[magenta]{dist_name}[/magenta]:[/red]")
+ for file_path, lineno, import_stmt in violations:
+ rel_path = file_path.relative_to(dist_path)
+ console.print(f" [red]{rel_path}:{lineno}: {import_stmt}[/red]")
+ console.print()
+ console.print(
+ f" [red]Please do not use airflow_shared imports in
[magenta]{dist_name}[/magenta][/red]"
+ )
+ console.print(
+ " [yellow]Use proper _shared imports instead (e.g.,
airflow._shared.* or airflow.sdk._shared.*)[/yellow]"
+ )
+ return False
+
+ console.print(
+ f" No airflow_shared imports found in [magenta]{dist_name}[/magenta]
[bold green]OK[/bold green]"
+ )
+ return True
+
+
+def check_only_allowed_shared_imports(dist_path: Path, dist_name: str,
allowed_prefix: str) -> bool:
+ """Check that only imports with the allowed _shared prefix are used."""
+ src_path = dist_path / "src"
+ if not src_path.exists():
+ console.print(f" [yellow]src/ directory does not exist for
[magenta]{dist_name}[/magenta][/yellow]")
+ return True
+
+ def allowed_shared_import_predicate(node, module_name, is_from_import):
+ """Check if _shared import uses the correct prefix."""
+ if "._shared" in module_name or module_name.endswith("._shared"):
+ if not module_name.startswith(allowed_prefix):
+ if is_from_import:
+ imported_names = ", ".join(alias.name for alias in
node.names)
+ return True, f"from {module_name} import {imported_names}"
+ return True, f"import {module_name}"
+ return False, ""
+
+ py_files = list(src_path.rglob("*.py"))
+ violations = _check_imports_in_files(py_files, dist_path,
allowed_shared_import_predicate, dist_name)
+
+ if violations:
+ console.print(f" [red]Found disallowed _shared imports in
[magenta]{dist_name}[/magenta]:[/red]")
+ for file_path, lineno, import_stmt in violations:
+ rel_path = file_path.relative_to(dist_path)
+ console.print(f" [red]{rel_path}:{lineno}: {import_stmt}[/red]")
+ console.print()
+ console.print(
+ f" [red]Only imports starting with '{allowed_prefix}' are allowed
in [magenta]{dist_name}[/magenta][/red]"
+ )
+ return False
+
+ console.print(
+ f" Only allowed _shared imports found in
[magenta]{dist_name}[/magenta] [bold green]OK[/bold green]"
+ )
+ return True
+
+
+def check_distribution(dist_path: Path, dist_name: str, allowed_shared_prefix:
str) -> bool:
+ """
+ Check a distribution for proper _shared imports usage.
+
+ Args:
+ dist_path: Path to the distribution directory
+ dist_name: Name of the distribution for display
+ allowed_shared_prefix: Allowed prefix for _shared imports (e.g.,
'airflow.sdk._shared')
+
+ Returns:
+ True if all checks pass, False otherwise
+ """
+ console.print(f"\n[bold blue]Checking:[/bold blue]
[magenta]{dist_name}[/magenta] distribution")
+
+ if not dist_path.exists():
+ console.print(f" [yellow]{dist_name} directory does not
exist[/yellow]")
+ return True
+
+ all_ok = True
+
+ # Check 1: No airflow_shared imports
+ if not check_no_airflow_shared_imports(dist_path, dist_name):
+ all_ok = False
+
+ # Check 2: Only allowed _shared imports
+ if not check_only_allowed_shared_imports(dist_path, dist_name,
allowed_shared_prefix):
+ all_ok = False
+
+ if all_ok:
+ console.print(f"[bold green]Summary: {dist_name} is OK[/bold green]")
+
+ return all_ok
+
+
+def check_task_sdk_distribution() -> bool:
+ """Check task-sdk distribution for proper _shared imports usage."""
+ return check_distribution(TASK_SDK_DIR, "task-sdk", "airflow.sdk._shared")
+
+
+def check_airflow_core_distribution() -> bool:
+ """Check airflow-core distribution for proper _shared imports usage."""
+ return check_distribution(AIRFLOW_CORE_DIR, "airflow-core",
"airflow._shared")
+
+
+def check_no_airflow_imports_devel_common(dist_path: Path) -> bool:
+ """Check that no Python files in devel-common use airflow imports."""
+ src_path = dist_path / "src"
+ if not src_path.exists():
+ console.print(" [yellow]src/ directory does not exist for
[magenta]devel-common[/magenta][/yellow]")
+ return True
+
+ def airflow_import_predicate(node, module_name, is_from_import):
+ """Check if import is from airflow package."""
+ if module_name == "airflow" or module_name.startswith("airflow."):
+ if is_from_import:
+ imported_names = ", ".join(alias.name for alias in node.names)
+ return True, f"from {module_name} import {imported_names}"
+ return True, f"import {module_name}"
+ return False, ""
+
+ py_files = list(src_path.rglob("*.py"))
+ violations = _check_imports_in_files(py_files, dist_path,
airflow_import_predicate, "devel-common")
+
+ if violations:
+ console.print(" [red]Found airflow imports in
[magenta]devel-common[/magenta]:[/red]")
+ for file_path, lineno, import_stmt in violations:
+ rel_path = file_path.relative_to(dist_path)
+ console.print(f" [red]{rel_path}:{lineno}: {import_stmt}[/red]")
+ console.print()
+ console.print(" [red]Please remove airflow imports from
[magenta]devel-common[/magenta][/red]")
+ console.print(
+ " [yellow]devel-common should not depend on airflow packages to
remain independent[/yellow]\n\n"
+ " [yellow]Those imports should be converted to `from
airflow_shared` or "
+ "moved to the devel-common distribution.[/yellow]"
+ )
+ return False
+
+ console.print(" No airflow imports found in
[magenta]devel-common[/magenta] [bold green]OK[/bold green]")
+ return True
+
+
+def check_devel_common_distribution() -> bool:
+ """Check devel-common distribution for proper imports usage."""
+ console.print("\n[bold blue]Checking:[/bold blue]
[magenta]devel-common[/magenta] distribution")
+
+ if not DEVEL_COMMON_DIR.exists():
+ console.print(" [yellow]devel-common directory does not
exist[/yellow]")
+ return True
+
+ all_ok = True
+
+ # Check: No airflow imports
+ if not check_no_airflow_imports_devel_common(DEVEL_COMMON_DIR):
+ all_ok = False
+
+ if all_ok:
+ console.print("[bold green]Summary: devel-common is OK[/bold green]")
+
+ return all_ok
+
+
def main() -> None:
- if not SHARED_DIR.exists():
- print("No shared directory found.")
- sys.exit(1)
all_ok = True
- for shared_project in SHARED_DIR.iterdir():
- if shared_project.is_dir():
- ok = check_shared_distribution(shared_project)
- if not ok:
- all_ok = False
+
+ # Check shared distributions
+ if SHARED_DIR.exists():
+ for shared_project in SHARED_DIR.iterdir():
+ if shared_project.is_dir():
+ ok = check_shared_distribution(shared_project)
+ if not ok:
+ all_ok = False
+ else:
+ console.print("[yellow]No shared directory found.[/yellow]")
+ sys.exit(1)
+
+ # Check task-sdk distribution
+ if not check_task_sdk_distribution():
+ all_ok = False
+
+ # Check airflow-core distribution
+ if not check_airflow_core_distribution():
+ all_ok = False
+
+ # Check devel-common distribution
+ if not check_devel_common_distribution():
+ all_ok = False
+
Review Comment:
Changes to this file look good
##########
providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/consume.py:
##########
@@ -21,7 +21,7 @@
from confluent_kafka import Consumer, KafkaError
from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
-from airflow.utils.module_loading import import_string
+from airflow.providers.common.compat.module_loading import import_string
Review Comment:
Yay! These provider changes look very nice
##########
airflow-core/tests/unit/always/test_project_structure.py:
##########
@@ -59,7 +59,6 @@ def test_providers_modules_should_have_tests(self):
"""
# The test below had a but for quite a while and we missed a lot of
modules to have tess
# We should make sure that one goes to 0
- # TODO(potiuk) - check if that test actually tests something
Review Comment:
Hah!
##########
airflow-core/docs/authoring-and-scheduling/serializers.rst:
##########
@@ -87,7 +87,7 @@ Registered
from typing import TYPE_CHECKING
- from airflow.utils.module_loading import qualname
+ from airflow._shared.module_loading import qualname
Review Comment:
Let's not have shared paths in docs when possible. For this case, since we
are going to move serde and serializers to task sdk, we can have it consume the
`airflow.sdk` path instead by re-exporting it better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]