jason810496 commented on code in PR #59747:
URL: https://github.com/apache/airflow/pull/59747#discussion_r2645998915


##########
dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:
##########
@@ -1256,6 +1386,116 @@ def deploy_airflow(
         sys.exit(return_code)
 
 
+@kubernetes_group.command(
+    name="dev",
+    help=(
+        "Run skaffold dev loop to sync dags and airflow-core sources to 
running pods "
+        "(scheduler/triggerer/dag-processor hot-reload; API server/webserver 
UI not by default)."
+    ),
+    context_settings=dict(
+        ignore_unknown_options=True,
+    ),
+)
+@option_python
+@option_kubernetes_version
+@option_executor
+@option_log_level
+@option_use_standard_naming
+@option_multi_namespace_mode
+@option_dags_path
+@option_dags_dest
+@option_skaffold_deploy
+@option_verbose
+@option_dry_run
[email protected]("skaffold_args", nargs=-1, type=click.UNPROCESSED)
+def dev(
+    python: str,
+    kubernetes_version: str,
+    executor: str,
+    log_level: str,
+    use_standard_naming: bool,
+    multi_namespace_mode: bool,
+    dags_path: Path,
+    dags_dest: str,
+    deploy: bool,
+    skaffold_args: tuple[str, ...],
+):
+    result = sync_virtualenv(force_venv_setup=False)
+    if result.returncode != 0:
+        sys.exit(result.returncode)
+    make_sure_kubernetes_tools_are_installed()
+    make_sure_skaffold_installed()
+    dags_path_abs = dags_path
+    if not dags_path_abs.is_absolute():
+        dags_path_abs = AIRFLOW_ROOT_PATH / dags_path_abs
+    dags_path_abs = dags_path_abs.resolve()
+    if not dags_path_abs.is_dir():
+        get_console().print(f"[error]DAGs path does not exist or is not a 
directory: {dags_path_abs}")
+        sys.exit(1)
+    try:
+        dags_relative_path = 
dags_path_abs.relative_to(AIRFLOW_ROOT_PATH).as_posix()
+    except ValueError:
+        get_console().print(f"[error]DAGs path must be under the Airflow 
sources: {AIRFLOW_ROOT_PATH}")
+        sys.exit(1)
+    if not get_kind_cluster_config_path(python=python, 
kubernetes_version=kubernetes_version).exists():
+        get_console().print(
+            f"\n[warning]Cluster for Python {python} and Kubernetes 
{kubernetes_version} "
+            "has not been created yet.\n"
+        )
+        get_console().print(
+            "[info]Run: "
+            f"`breeze k8s create-cluster --python {python} 
--kubernetes-version {kubernetes_version}`\n"
+        )
+        sys.exit(1)
+    skaffold_config = _build_skaffold_config(
+        python=python,
+        kubernetes_version=kubernetes_version,
+        executor=executor,
+        use_standard_naming=use_standard_naming,
+        multi_namespace_mode=multi_namespace_mode,
+        dags_relative_path=dags_relative_path,
+        dags_dest=dags_dest,
+        log_level=log_level,
+    )
+    if not deploy:
+        get_console().print(
+            "[info]Running skaffold without deploying Helm resources. "
+            "If sync cannot find pods, rerun with --deploy."
+        )
+    with tempfile.TemporaryDirectory(prefix="skaffold_") as tmp_dir:
+        dev_env_values = {
+            "scheduler": {"env": [{"name": "DEV_MODE", "value": "true"}]},
+            "triggerer": {"env": [{"name": "DEV_MODE", "value": "true"}]},
+            "dagProcessor": {"env": [{"name": "DEV_MODE", "value": "true"}]},

Review Comment:
   I think we could include the API Server as well (though it was still labeled 
`webserver` in the chart). I don’t see any downsides to having an API Server 
with hot reload support in k8s tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to