amoghrajesh commented on code in PR #58825:
URL: https://github.com/apache/airflow/pull/58825#discussion_r2579879617


##########
airflow-core/src/airflow/settings.py:
##########
@@ -636,6 +636,7 @@ def _configure_secrets_masker():
     core_masker.sensitive_variables_fields = list(sensitive_fields)
     core_masker.secret_mask_adapter = secret_mask_adapter
 
+    # TODO: this should be moved out when settings are moved to `shared`

Review Comment:
   I dont think I follow this one



##########
airflow-core/src/airflow/serialization/serde.py:
##########
@@ -31,10 +31,10 @@
 import attr
 
 import airflow.serialization.serializers
+from airflow._shared.module_loading import import_string, iter_namespace, 
qualname

Review Comment:
   Note to self: revisit this occurence as part of [Move over serde library to 
task sdk](https://github.com/apache/airflow/issues/58885)



##########
shared/module_loading/src/airflow_shared/module_loading/__init__.py:
##########
@@ -14,21 +15,24 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 from __future__ import annotations
 
+import pkgutil
 from collections.abc import Callable
 from importlib import import_module
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from types import ModuleType
 
 
 def import_string(dotted_path: str):
     """
     Import a dotted module path and return the attribute/class designated by 
the last name in the path.
 
-    Note: Only supports top-level attributes or classes.
-
     Raise ImportError if the import failed.
     """
+    # TODO: Add support for nested classes. Currently, it only works for 
top-level classes.

Review Comment:
   Do we need to? I dont see a use case for it



##########
shared/configuration/src/airflow_shared/configuration/parser.py:
##########
@@ -1149,7 +1149,7 @@ def getimport(self, section: str, key: str, **kwargs) -> 
Any:
 
         try:
             # Import here to avoid circular dependency
-            from airflow.utils.module_loading import import_string
+            from ..module_loading import import_string

Review Comment:
   Nice



##########
scripts/ci/prek/check_shared_distributions_structure.py:
##########
@@ -202,20 +372,217 @@ def check_shared_distribution(shared_path: Path) -> bool:
         return False
     if not check_ruff_lint_rules(ruff, shared_path):
         return False
+    if not check_no_airflow_dependencies(pyproject, shared_path):
+        return False
+    if not check_no_airflow_imports(shared_path):
+        return False
     console.print(f"[bold green]Summary: {shared_path.name} is OK[/bold 
green]")
     return True
 
 
+def check_no_airflow_shared_imports(dist_path: Path, dist_name: str) -> bool:
+    """Check that no Python files use airflow_shared imports."""
+    src_path = dist_path / "src"
+    if not src_path.exists():
+        console.print(f"  [yellow]src/ directory does not exist for 
[magenta]{dist_name}[/magenta][/yellow]")
+        return True
+
+    def airflow_shared_import_predicate(node, module_name, is_from_import):
+        """Check if import is from airflow_shared package."""
+        if module_name == "airflow_shared" or 
module_name.startswith("airflow_shared."):
+            if is_from_import:
+                imported_names = ", ".join(alias.name for alias in node.names)
+                return True, f"from {module_name} import {imported_names}"
+            return True, f"import {module_name}"
+        return False, ""
+
+    py_files = list(src_path.rglob("*.py"))
+    violations = _check_imports_in_files(py_files, dist_path, 
airflow_shared_import_predicate, dist_name)
+
+    if violations:
+        console.print(f"  [red]Found airflow_shared imports in 
[magenta]{dist_name}[/magenta]:[/red]")
+        for file_path, lineno, import_stmt in violations:
+            rel_path = file_path.relative_to(dist_path)
+            console.print(f"    [red]{rel_path}:{lineno}: {import_stmt}[/red]")
+        console.print()
+        console.print(
+            f"  [red]Please do not use airflow_shared imports in 
[magenta]{dist_name}[/magenta][/red]"
+        )
+        console.print(
+            "  [yellow]Use proper _shared imports instead (e.g., 
airflow._shared.* or airflow.sdk._shared.*)[/yellow]"
+        )
+        return False
+
+    console.print(
+        f"  No airflow_shared imports found in [magenta]{dist_name}[/magenta] 
[bold green]OK[/bold green]"
+    )
+    return True
+
+
+def check_only_allowed_shared_imports(dist_path: Path, dist_name: str, 
allowed_prefix: str) -> bool:
+    """Check that only imports with the allowed _shared prefix are used."""
+    src_path = dist_path / "src"
+    if not src_path.exists():
+        console.print(f"  [yellow]src/ directory does not exist for 
[magenta]{dist_name}[/magenta][/yellow]")
+        return True
+
+    def allowed_shared_import_predicate(node, module_name, is_from_import):
+        """Check if _shared import uses the correct prefix."""
+        if "._shared" in module_name or module_name.endswith("._shared"):
+            if not module_name.startswith(allowed_prefix):
+                if is_from_import:
+                    imported_names = ", ".join(alias.name for alias in 
node.names)
+                    return True, f"from {module_name} import {imported_names}"
+                return True, f"import {module_name}"
+        return False, ""
+
+    py_files = list(src_path.rglob("*.py"))
+    violations = _check_imports_in_files(py_files, dist_path, 
allowed_shared_import_predicate, dist_name)
+
+    if violations:
+        console.print(f"  [red]Found disallowed _shared imports in 
[magenta]{dist_name}[/magenta]:[/red]")
+        for file_path, lineno, import_stmt in violations:
+            rel_path = file_path.relative_to(dist_path)
+            console.print(f"    [red]{rel_path}:{lineno}: {import_stmt}[/red]")
+        console.print()
+        console.print(
+            f"  [red]Only imports starting with '{allowed_prefix}' are allowed 
in [magenta]{dist_name}[/magenta][/red]"
+        )
+        return False
+
+    console.print(
+        f"  Only allowed _shared imports found in 
[magenta]{dist_name}[/magenta] [bold green]OK[/bold green]"
+    )
+    return True
+
+
+def check_distribution(dist_path: Path, dist_name: str, allowed_shared_prefix: 
str) -> bool:
+    """
+    Check a distribution for proper _shared imports usage.
+
+    Args:
+        dist_path: Path to the distribution directory
+        dist_name: Name of the distribution for display
+        allowed_shared_prefix: Allowed prefix for _shared imports (e.g., 
'airflow.sdk._shared')
+
+    Returns:
+        True if all checks pass, False otherwise
+    """
+    console.print(f"\n[bold blue]Checking:[/bold blue] 
[magenta]{dist_name}[/magenta] distribution")
+
+    if not dist_path.exists():
+        console.print(f"  [yellow]{dist_name} directory does not 
exist[/yellow]")
+        return True
+
+    all_ok = True
+
+    # Check 1: No airflow_shared imports
+    if not check_no_airflow_shared_imports(dist_path, dist_name):
+        all_ok = False
+
+    # Check 2: Only allowed _shared imports
+    if not check_only_allowed_shared_imports(dist_path, dist_name, 
allowed_shared_prefix):
+        all_ok = False
+
+    if all_ok:
+        console.print(f"[bold green]Summary: {dist_name} is OK[/bold green]")
+
+    return all_ok
+
+
+def check_task_sdk_distribution() -> bool:
+    """Check task-sdk distribution for proper _shared imports usage."""
+    return check_distribution(TASK_SDK_DIR, "task-sdk", "airflow.sdk._shared")
+
+
+def check_airflow_core_distribution() -> bool:
+    """Check airflow-core distribution for proper _shared imports usage."""
+    return check_distribution(AIRFLOW_CORE_DIR, "airflow-core", 
"airflow._shared")
+
+
+def check_no_airflow_imports_devel_common(dist_path: Path) -> bool:
+    """Check that no Python files in devel-common use airflow imports."""
+    src_path = dist_path / "src"
+    if not src_path.exists():
+        console.print("  [yellow]src/ directory does not exist for 
[magenta]devel-common[/magenta][/yellow]")
+        return True
+
+    def airflow_import_predicate(node, module_name, is_from_import):
+        """Check if import is from airflow package."""
+        if module_name == "airflow" or module_name.startswith("airflow."):
+            if is_from_import:
+                imported_names = ", ".join(alias.name for alias in node.names)
+                return True, f"from {module_name} import {imported_names}"
+            return True, f"import {module_name}"
+        return False, ""
+
+    py_files = list(src_path.rglob("*.py"))
+    violations = _check_imports_in_files(py_files, dist_path, 
airflow_import_predicate, "devel-common")
+
+    if violations:
+        console.print("  [red]Found airflow imports in 
[magenta]devel-common[/magenta]:[/red]")
+        for file_path, lineno, import_stmt in violations:
+            rel_path = file_path.relative_to(dist_path)
+            console.print(f"    [red]{rel_path}:{lineno}: {import_stmt}[/red]")
+        console.print()
+        console.print("  [red]Please remove airflow imports from 
[magenta]devel-common[/magenta][/red]")
+        console.print(
+            "  [yellow]devel-common should not depend on airflow packages to 
remain independent[/yellow]\n\n"
+            "  [yellow]Those imports should be converted to `from 
airflow_shared` or "
+            "moved to the devel-common distribution.[/yellow]"
+        )
+        return False
+
+    console.print("  No airflow imports found in 
[magenta]devel-common[/magenta] [bold green]OK[/bold green]")
+    return True
+
+
+def check_devel_common_distribution() -> bool:
+    """Check devel-common distribution for proper imports usage."""
+    console.print("\n[bold blue]Checking:[/bold blue] 
[magenta]devel-common[/magenta] distribution")
+
+    if not DEVEL_COMMON_DIR.exists():
+        console.print("  [yellow]devel-common directory does not 
exist[/yellow]")
+        return True
+
+    all_ok = True
+
+    # Check: No airflow imports
+    if not check_no_airflow_imports_devel_common(DEVEL_COMMON_DIR):
+        all_ok = False
+
+    if all_ok:
+        console.print("[bold green]Summary: devel-common is OK[/bold green]")
+
+    return all_ok
+
+
 def main() -> None:
-    if not SHARED_DIR.exists():
-        print("No shared directory found.")
-        sys.exit(1)
     all_ok = True
-    for shared_project in SHARED_DIR.iterdir():
-        if shared_project.is_dir():
-            ok = check_shared_distribution(shared_project)
-            if not ok:
-                all_ok = False
+
+    # Check shared distributions
+    if SHARED_DIR.exists():
+        for shared_project in SHARED_DIR.iterdir():
+            if shared_project.is_dir():
+                ok = check_shared_distribution(shared_project)
+                if not ok:
+                    all_ok = False
+    else:
+        console.print("[yellow]No shared directory found.[/yellow]")
+        sys.exit(1)
+
+    # Check task-sdk distribution
+    if not check_task_sdk_distribution():
+        all_ok = False
+
+    # Check airflow-core distribution
+    if not check_airflow_core_distribution():
+        all_ok = False
+
+    # Check devel-common distribution
+    if not check_devel_common_distribution():
+        all_ok = False
+

Review Comment:
   Changes to this file look good



##########
providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/consume.py:
##########
@@ -21,7 +21,7 @@
 from confluent_kafka import Consumer, KafkaError
 
 from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
-from airflow.utils.module_loading import import_string
+from airflow.providers.common.compat.module_loading import import_string

Review Comment:
   Yay! These provider changes look very nice



##########
airflow-core/tests/unit/always/test_project_structure.py:
##########
@@ -59,7 +59,6 @@ def test_providers_modules_should_have_tests(self):
         """
         # The test below had a but for quite a while and we missed a lot of 
modules to have tess
         # We should make sure that one goes to 0
-        # TODO(potiuk) - check if that test actually tests something

Review Comment:
   Hah!



##########
airflow-core/docs/authoring-and-scheduling/serializers.rst:
##########
@@ -87,7 +87,7 @@ Registered
 
     from typing import TYPE_CHECKING
 
-    from airflow.utils.module_loading import qualname
+    from airflow._shared.module_loading import qualname

Review Comment:
   Let's not have shared paths in docs when possible. For this case, since we 
are going to move serde and serializers to task sdk, we can have it consume the 
`airflow.sdk` path instead by re-exporting it better 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to