Lee-W commented on code in PR #55110:
URL: https://github.com/apache/airflow/pull/55110#discussion_r2417616407
##########
providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py:
##########
@@ -1274,7 +1282,9 @@ def insert_job(
job_api_repr.result(timeout=timeout, retry=retry)
return job_api_repr
- def generate_job_id(self, job_id, dag_id, task_id, logical_date,
configuration, force_rerun=False) -> str:
+ def generate_job_id(
+ self, job_id, dag_id, task_id, logical_date, configuration,
run_after=None, force_rerun=False
Review Comment:
let's add type annotation here
##########
providers/apache/hive/src/airflow/providers/apache/hive/operators/hive.py:
##########
@@ -143,9 +143,15 @@ def execute(self, context: Context) -> None:
# set the mapred_job_name if it's not set with dag, task, execution
time info
if not self.mapred_job_name:
ti = context["ti"]
- logical_date = context["logical_date"]
+ logical_date = context.get("logical_date", None)
if logical_date is None:
- raise RuntimeError("logical_date is None")
+ raise RuntimeError(
+ "logical_date is None. Please make sure the task is not
used in an asset-triggered DAG. "
+ "HiveOperator was designed to work with timetable
scheduled DAGs, "
+ "and asset-triggered DAGs do not have logical_date. "
+ "if asset-triggered HiveOperator is a required use case, "
+ "please open an issue on the airflow project."
Review Comment:
```suggestion
"logical_date is None. Please make sure the task is not
used in an asset-triggered Dag. "
"HiveOperator was designed to work with timetable
scheduled Dags, "
"and asset-triggered Dags do not have logical_date. "
"If asset-triggered HiveOperator is a required use case,
"
"please open an issue on the Airflow project."
```
##########
providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py:
##########
@@ -1285,10 +1295,35 @@ def generate_job_id(self, job_id, dag_id, task_id,
logical_date, configuration,
if job_id:
return f"{job_id}_{uniqueness_suffix}"
- exec_date = logical_date.isoformat()
- job_id = f"airflow_{dag_id}_{task_id}_{exec_date}_{uniqueness_suffix}"
+ if logical_date is not None:
+ if AIRFLOW_V_3_0_PLUS:
+ warnings.warn(
+ "The 'logical_date' parameter is deprecated. Please use
'run_after' instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
+ job_id_timestamp = logical_date
+ elif run_after is not None:
+ job_id_timestamp = run_after
+ else:
+ job_id_timestamp = pendulum.now("UTC")
+
+ job_id =
f"airflow_{dag_id}_{task_id}_{job_id_timestamp.isoformat()}_{uniqueness_suffix}"
return re.sub(r"[:\-+.]", "_", job_id)
+ def get_run_after_or_logical_date(self, context: Context) ->
pendulum.DateTime:
+ if AIRFLOW_V_3_0_PLUS:
+ if dag_run := context.get("dag_run"):
+ run_after = pendulum.instance(dag_run.run_after)
+ else:
+ run_after = pendulum.now("UTC")
+ else:
+ if logical_date := context.get("logical_date"):
+ run_after = logical_date
+ else:
+ run_after = pendulum.now("UTC")
Review Comment:
We should unify the name everywhere to reduce ambiguity
providers/google/src/airflow/providers/google/cloud/operators/workflows.py
##########
providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py:
##########
@@ -215,8 +215,9 @@ def execute(self, context: Context):
job_id=self.job_id,
dag_id=self.dag_id,
task_id=self.task_id,
- logical_date=context["logical_date"],
+ logical_date=None,
Review Comment:
same here
##########
providers/google/src/airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -2370,20 +2370,13 @@ def execute(self, context: Any):
if self.project_id is None:
self.project_id = hook.project_id
- # Handle missing logical_date. Example: asset-triggered DAGs (Airflow
3)
- logical_date = context.get("logical_date")
- if logical_date is None:
- # Use dag_run.run_after as fallback when logical_date is not
available
- dag_run = context.get("dag_run")
- if dag_run and hasattr(dag_run, "run_after"):
- logical_date = dag_run.run_after
-
self.job_id = hook.generate_job_id(
job_id=self.job_id,
dag_id=self.dag_id,
task_id=self.task_id,
- logical_date=logical_date,
+ logical_date=None,
Review Comment:
Why do we set it to always None here? Should we just remove this argument
instead?
##########
providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py:
##########
@@ -110,15 +110,22 @@ def __init__(
self.poll_interval = poll_interval
def _get_logical_dates(self, context) -> tuple[datetime, datetime]:
+ logical_date = context.get("logical_date", None)
+ if logical_date is None:
+ raise RuntimeError(
+ "logical_date is None. Please make sure the sensor is not used
in an asset-triggered DAG. "
+ "CloudComposerDAGRunSensor was designed to be used in
time-based scheduled DAGs only, "
+ "and asset-triggered DAGs do not have logical_date. "
Review Comment:
```suggestion
"logical_date is None. Please make sure the sensor is not
used in an asset-triggered Dag. "
"CloudComposerDAGRunSensor was designed to be used in
time-based scheduled Dags only, "
"and asset-triggered Dags do not have logical_date. "
```
##########
providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py:
##########
@@ -337,8 +337,9 @@ def execute(self, context: Context):
job_id=self.job_id,
dag_id=self.dag_id,
task_id=self.task_id,
- logical_date=context["logical_date"],
+ logical_date=None,
Review Comment:
same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]