Vamsi-klu commented on PR #67052: URL: https://github.com/apache/airflow/pull/67052#issuecomment-4472807831
Hi @shahar1 -- addressed the system-test concern. Why PR #61046 had to be reverted (revert commit 14995f1967f29): the example called CloudComposerDAGRunSensor without execution_range, so the sensor inferred the window [logical_date - 1d, logical_date]. The Composer environment was created mid-DAG-run, which meant no airflow_monitoring runs ever fell inside that past window. The buggy code returned True (false positive); the empty-window fix correctly returns False and the example then hung forever. The example patch in the earlier revision of this PR tried to side-step the issue with execution_range=[datetime.now() - timedelta(1), datetime.now()], but that's evaluated at DAG-parse time -- roughly half an hour before the sensor pokes and before the Composer env exists. Same hang mode, just hidden. What this commit changes: - composer_dag_run_id added to CloudComposerDAGRunSensor.template_fields. - Example reordered so each CloudComposerTriggerDAGRunOperator runs before its sibling Dag-run sensor; sensors pull the freshly minted dag_run_id via XCom. The run-id branch is pure id+state equality and bypasses execution_range entirely, so the hang mode cannot recur on the new path. - Defensive timeout=60*60 on the Dag-run sensors so any future regression to a windowed code path fails CI fast rather than hanging the worker. - External-task sensors deliberately left on the legacy windowed path -- the analogous _check_task_instances_states anti-pattern is tracked at https://github.com/apache/airflow/issues/67051, and the example carries an inline note so the #67051 fix must reorder them at the same time. Verification I can run locally without GCP credentials: - six new sensor unit tests + two new trigger unit tests covering the composer_dag_run_id branch (template_fields, Jinja render, poke success/wait, range-vs-id precedence, parse-time source guard, trigger branch precedence, trigger polling) -- all 75 tests in providers/google/tests/unit/google/cloud/{sensors,triggers}/test_cloud_composer.py pass. - dev/simulate_composer_system_test.py, a standalone harness that loads the example via importlib, mocks every GCP surface the example touches (googleapiclient + CloudComposerHook + CloudComposerAsyncHook + Environment / ImageVersion / ExecuteAirflowCommandResponse.to_dict), runs dag.test() end-to-end, and reports per-task state. The deferred trigger runs inline under dag.test(), so the deferrable sensor path is exercised too. Simulation result: ``` $ uv run --project providers/google python dev/simulate_composer_system_test.py loading example dag from .../example_cloud_composer.py registering dag bundle and serializing dag running dag.test() on composer OK: dag.test() completed for composer -- final state: success create_env -> success dag_run_sensor -> success defer_create_env -> success defer_dag_run_sensor -> success defer_delete_env -> success defer_external_task_sensor -> success defer_run_airflow_cli_cmd -> success defer_trigger_dag_run -> success defer_update_env -> success delete_env -> success external_task_sensor -> success get_env -> success get_project_number -> success image_versions -> success list_envs -> success run_airflow_cli_cmd -> success trigger_dag_run -> success update_env -> success watcher -> skipped ``` Both the sync dag_run_sensor and the deferred defer_dag_run_sensor matched on the trigger ops' run id via the composer_dag_run_id branch. @VladaZakharova @MaksYermak -- would appreciate a confirmation pass against real Composer infra when convenient, since the simulation can only validate the control flow, not the actual REST API contracts. --- Drafted-by: Claude Code (Opus 4.7 1M); reviewed by @Vamsi-klu before posting -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
