dwreeves opened a new issue, #62362:
URL: https://github.com/apache/airflow/issues/62362

   ### Description
   
   Instead of importing `snowflake` modules in the global scope, I am proposing 
these should be lazy-loaded instead.
   
   ### Use case/motivation
   
   ## Motivation
   
   `snowflake` modules take up a lot of resources. Lazy-loading these modules 
puts a little less strain on the scheduler + DAG processing in a lot of 
end-user code.
   
   This is less useful now that `SnowflakeOperator` is removed, so the 
`SnowflakeHook` is not imported into a DAG-level context with conventional, 
basic usage of operators, but it still may be the case that a user globally 
imports the `SnowflakeHook` for various custom operations.
   
   Lazy-loading `snowflake` significantly reduces the need for users to do 
their own lazy-loading, and reduces the penalty of a global import.
   
   ## Benchmarking
   
   Here is a script to measure the impact of lazy-loading Snowflake.
   
   Output on my laptop:
   
   ```
   ─────────────────────────── Snowflake Hook — Import Cost Benchmark 
───────────────────────────
     Python: 3.13.1 (main, Jan 14 2025, 23:48:54) [Clang 19.1.6 ]
     Runs per scenario: 10  |  also benchmark w/o .pyc: no
   
               avg ms (±stdev)  |  avg RSS MB  —  10 runs each             
   ╭───────────────────────────────┬───────────────────┬──────────────────╮
   │ Scenario                      │ Time w/ .pyc (ms) │ RSS w/ .pyc (MB) │
   ├───────────────────────────────┼───────────────────┼──────────────────┤
   │ No Snowflake, no SQLAlchemy   │             60 ±2 │             17.2 │
   │ No Snowflake, yes SQLAlchemy  │     121 ±1  (+61) │    31.0  (+13.9) │
   │ Yes Snowflake, yes SQLAlchemy │    291 ±5  (+231) │    59.8  (+42.6) │
   ╰───────────────────────────────┴───────────────────┴──────────────────╯
     (+N) = delta vs first scenario  |  ±N = stdev across runs
   ```
   
   As you can see, the cost is quite significant: an extra +170ms and nearly 
+30mb of memory.
   
   Here is the script used to run these benchmarks. **Note:** script is AI 
generated / assisted.
   
   ```python
   #!/usr/bin/env python3
   # NOTE: AI generated script.
   # /// script
   # requires-python = ">=3.11"
   # dependencies = [
   #   "rich-click",
   #   "psutil",
   #   "rich",
   #   "requests",
   #   "tenacity",
   #   "cryptography",
   #   "snowflake-connector-python",
   #   "snowflake-sqlalchemy",
   #   "sqlalchemy",
   # ]
   # ///
   """
   Measures the import cost of airflow/providers/snowflake/hooks/snowflake.py
   with and without the snowflake-specific imports, across N cold-cache runs.
   
   Optionally also benchmarks with .pyc bytecode caches enabled (--pyc).
   
   Usage:
       uv run benchmark_snowflake_imports.py
       uv run benchmark_snowflake_imports.py -n 20
       uv run benchmark_snowflake_imports.py --pyc
       uv run benchmark_snowflake_imports.py --pyc -n 5
   """
   from __future__ import annotations
   
   import os
   import shutil
   import statistics
   import subprocess
   import sys
   import tempfile
   from dataclasses import dataclass, field
   
   import rich_click as click
   import psutil
   from rich import box
   from rich.console import Console
   from rich.table import Table
   from rich.text import Text
   
   console = Console()
   
   # All imports from snowflake.py, minus the snowflake-specific ones
   WITHOUT_SNOWFLAKE = """\
   import base64
   import os
   from collections.abc import Callable, Iterable, Mapping
   from contextlib import closing, contextmanager
   from datetime import datetime, timedelta
   from functools import cached_property
   from io import StringIO
   from pathlib import Path
   from typing import TYPE_CHECKING, Any, TypeVar, overload
   from urllib.parse import urlparse
   import requests
   import tenacity
   from cryptography.hazmat.backends import default_backend
   from cryptography.hazmat.primitives import serialization
   from requests.auth import HTTPBasicAuth
   from requests.exceptions import ConnectionError, HTTPError, Timeout
   """
   
   WITHOUT_SNOWFLAKE_WITH_SQLALCHEMY = WITHOUT_SNOWFLAKE + """\
   from sqlalchemy import create_engine
   """
   
   WITH_SNOWFLAKE = WITHOUT_SNOWFLAKE + """\
   from snowflake import connector
   from snowflake.connector import DictCursor, SnowflakeConnection, util_text
   from snowflake.sqlalchemy import URL
   from sqlalchemy import create_engine
   """
   
   SCENARIOS = [
       ("No Snowflake, no SQLAlchemy", WITHOUT_SNOWFLAKE),
       ("No Snowflake, yes SQLAlchemy", WITHOUT_SNOWFLAKE_WITH_SQLALCHEMY),
       ("Yes Snowflake, yes SQLAlchemy", WITH_SNOWFLAKE),
   ]
   
   
   @dataclass
   class RunStats:
       times_ms: list[float] = field(default_factory=list)
       rss_mb: list[float] = field(default_factory=list)
   
       @property
       def avg_ms(self) -> float:
           return statistics.mean(self.times_ms)
   
       @property
       def avg_rss(self) -> float:
           return statistics.mean(self.rss_mb)
   
       @property
       def stdev_ms(self) -> float:
           return statistics.stdev(self.times_ms) if len(self.times_ms) > 1 
else 0.0
   
   
   @dataclass
   class ScenarioResult:
       label: str
       with_pyc: RunStats
       without_pyc: RunStats | None = None
   
   
   SUBPROCESS_CODE_TEMPLATE = """\
   import time, os, sys
   import psutil
   
   proc = psutil.Process(os.getpid())
   rss_before = proc.memory_info().rss
   t0 = time.perf_counter()
   try:
   {indented}
       elapsed = time.perf_counter() - t0
       rss_after = proc.memory_info().rss
       print(f"{{elapsed:.6f}} {{(rss_after - rss_before) / 1024 / 1024:.4f}}")
   except Exception as e:
       print(f"ERROR: {{e}}", file=sys.stderr)
       sys.exit(1)
   """
   
   
   def _single_run(import_block: str, use_pyc: bool) -> tuple[float, float]:
       """Spawn a fresh interpreter and return (elapsed_ms, rss_mb)."""
       indented = "\n".join("    " + line for line in import_block.splitlines())
       code = SUBPROCESS_CODE_TEMPLATE.format(indented=indented)
   
       env = None
       tmp_dir = None
       if not use_pyc:
           # Redirect bytecode cache to a fresh empty dir so Python finds no 
.pyc
           # files and must compile every module from source.
           tmp_dir = tempfile.mkdtemp(prefix="no_pyc_bench_")
           env = {**os.environ, "PYTHONPYCACHEPREFIX": tmp_dir}
   
       try:
           result = subprocess.run(
               [sys.executable, "-c", code],
               capture_output=True,
               text=True,
               env=env,
           )
       finally:
           if tmp_dir:
               shutil.rmtree(tmp_dir, ignore_errors=True)
   
       if result.returncode != 0:
           console.print(f"[red]ERROR:[/] {result.stderr.strip()}")
           sys.exit(1)
   
       elapsed_s, rss_mb = result.stdout.strip().split()
       return float(elapsed_s) * 1000, float(rss_mb)
   
   
   def measure(label: str, import_block: str, n: int, include_no_pyc: bool) -> 
ScenarioResult:
       with_pyc = RunStats()
       without_pyc = RunStats() if include_no_pyc else None
   
       # Warm-up run to ensure .pyc files are written before timed runs.
       with console.status(f"  [italic]{label}[/]  warming up .pyc cache…", 
spinner="dots"):
           _single_run(import_block, use_pyc=True)
   
       for i in range(n):
           with console.status(f"  [italic]{label}[/]  run {i + 1}/{n} (w/ 
.pyc)…", spinner="dots"):
               ms, mb = _single_run(import_block, use_pyc=True)
           with_pyc.times_ms.append(ms)
           with_pyc.rss_mb.append(mb)
   
       if include_no_pyc:
           for i in range(n):
               with console.status(f"  [italic]{label}[/]  run {i + 1}/{n} (w/o 
.pyc)…", spinner="dots"):
                   ms, mb = _single_run(import_block, use_pyc=False)
               without_pyc.times_ms.append(ms)  # type: ignore[union-attr]
               without_pyc.rss_mb.append(mb)  # type: ignore[union-attr]
   
       return ScenarioResult(label=label, with_pyc=with_pyc, 
without_pyc=without_pyc)
   
   
   def _time_cell(stats: RunStats, baseline_ms: float) -> Text:
       t = Text(f"{stats.avg_ms:.0f}", style="white")
       t.append(f" ±{stats.stdev_ms:.0f}", style="dim")
       if stats.avg_ms != baseline_ms:
           t.append(f"  (+{stats.avg_ms - baseline_ms:.0f})", style="dim cyan")
       return t
   
   
   def _rss_cell(stats: RunStats, baseline_mb: float) -> Text:
       t = Text(f"{stats.avg_rss:.1f}", style="white")
       if stats.avg_rss != baseline_mb:
           t.append(f"  (+{stats.avg_rss - baseline_mb:.1f})", style="dim cyan")
       return t
   
   
   @click.command()
   @click.option(
       "-n", "--runs",
       default=10,
       show_default=True,
       help="Number of cold-cache subprocess runs to average per scenario.",
   )
   @click.option(
       "--no-pyc",
       is_flag=True,
       default=False,
       help="Also run benchmarks without .pyc bytecode caches.",
   )
   @click.rich_config({"theme": "cargo-slim"})
   def main(runs: int, no_pyc: bool) -> None:
       console.print()
       console.rule("[bold cyan]Snowflake Hook — Import Cost Benchmark[/]")
       console.print(f"  [dim]Python: {sys.version}[/]")
       console.print(f"  [dim]Runs per scenario: {runs}  |  also benchmark w/o 
.pyc: {'yes' if no_pyc else 'no'}[/]")
   
       console.print()
   
       results = [measure(label, block, runs, no_pyc) for label, block in 
SCENARIOS]
   
       table = Table(
           box=box.ROUNDED,
           show_footer=False,
           title_style="bold cyan",
           border_style="grey50",
           title=f"avg ms (±stdev)  |  avg RSS MB  —  {runs} runs each",
       )
       table.add_column("Scenario", style="white", no_wrap=True)
       table.add_column("Time w/ .pyc (ms)", justify="right")
       table.add_column("RSS w/ .pyc (MB)", justify="right")
       if no_pyc:
           table.add_column("Time w/o .pyc (ms)", justify="right")
           table.add_column("RSS w/o .pyc (MB)", justify="right")
   
       baseline_pyc_ms = results[0].with_pyc.avg_ms
       baseline_pyc_mb = results[0].with_pyc.avg_rss
       baseline_nopyc_ms = results[0].without_pyc.avg_ms if no_pyc else 0.0
       baseline_nopyc_mb = results[0].without_pyc.avg_rss if no_pyc else 0.0
   
       for r in results:
           row = [
               r.label,
               _time_cell(r.with_pyc, baseline_pyc_ms),
               _rss_cell(r.with_pyc, baseline_pyc_mb),
           ]
           if no_pyc and r.without_pyc is not None:
               row += [
                   _time_cell(r.without_pyc, baseline_nopyc_ms),
                   _rss_cell(r.without_pyc, baseline_nopyc_mb),
               ]
           table.add_row(*row)
   
       console.print(table)
       console.print("  [dim](+N) = delta vs first scenario  |  ±N = stdev 
across runs[/]")
       console.print()
   
   
   if __name__ == "__main__":
       main()
   ```
   
   
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to