Lee-W commented on code in PR #55110:
URL: https://github.com/apache/airflow/pull/55110#discussion_r2417616407


##########
providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py:
##########
@@ -1274,7 +1282,9 @@ def insert_job(
             job_api_repr.result(timeout=timeout, retry=retry)
         return job_api_repr
 
-    def generate_job_id(self, job_id, dag_id, task_id, logical_date, 
configuration, force_rerun=False) -> str:
+    def generate_job_id(
+        self, job_id, dag_id, task_id, logical_date, configuration, 
run_after=None, force_rerun=False

Review Comment:
   let's add type annotation here



##########
providers/apache/hive/src/airflow/providers/apache/hive/operators/hive.py:
##########
@@ -143,9 +143,15 @@ def execute(self, context: Context) -> None:
         # set the mapred_job_name if it's not set with dag, task, execution 
time info
         if not self.mapred_job_name:
             ti = context["ti"]
-            logical_date = context["logical_date"]
+            logical_date = context.get("logical_date", None)
             if logical_date is None:
-                raise RuntimeError("logical_date is None")
+                raise RuntimeError(
+                    "logical_date is None. Please make sure the task is not 
used in an asset-triggered DAG. "
+                    "HiveOperator was designed to work with timetable 
scheduled DAGs, "
+                    "and asset-triggered DAGs do not have logical_date. "
+                    "if asset-triggered HiveOperator is a required use case, "
+                    "please open an issue on the airflow project."

Review Comment:
   ```suggestion
                       "logical_date is None. Please make sure the task is not 
used in an asset-triggered Dag. "
                       "HiveOperator was designed to work with timetable 
scheduled Dags, "
                       "and asset-triggered Dags do not have logical_date. "
                       "If asset-triggered HiveOperator is a required use case, 
"
                       "please open an issue on the Airflow project."
   ```



##########
providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py:
##########
@@ -1285,10 +1295,35 @@ def generate_job_id(self, job_id, dag_id, task_id, 
logical_date, configuration,
         if job_id:
             return f"{job_id}_{uniqueness_suffix}"
 
-        exec_date = logical_date.isoformat()
-        job_id = f"airflow_{dag_id}_{task_id}_{exec_date}_{uniqueness_suffix}"
+        if logical_date is not None:
+            if AIRFLOW_V_3_0_PLUS:
+                warnings.warn(
+                    "The 'logical_date' parameter is deprecated. Please use 
'run_after' instead.",
+                    AirflowProviderDeprecationWarning,
+                    stacklevel=1,
+                )
+            job_id_timestamp = logical_date
+        elif run_after is not None:
+            job_id_timestamp = run_after
+        else:
+            job_id_timestamp = pendulum.now("UTC")
+
+        job_id = 
f"airflow_{dag_id}_{task_id}_{job_id_timestamp.isoformat()}_{uniqueness_suffix}"
         return re.sub(r"[:\-+.]", "_", job_id)
 
+    def get_run_after_or_logical_date(self, context: Context) -> 
pendulum.DateTime:
+        if AIRFLOW_V_3_0_PLUS:
+            if dag_run := context.get("dag_run"):
+                run_after = pendulum.instance(dag_run.run_after)
+            else:
+                run_after = pendulum.now("UTC")
+        else:
+            if logical_date := context.get("logical_date"):
+                run_after = logical_date
+            else:
+                run_after = pendulum.now("UTC")

Review Comment:
   We should unify the name everywhere to reduce ambiguity 
providers/google/src/airflow/providers/google/cloud/operators/workflows.py



##########
providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py:
##########
@@ -215,8 +215,9 @@ def execute(self, context: Context):
             job_id=self.job_id,
             dag_id=self.dag_id,
             task_id=self.task_id,
-            logical_date=context["logical_date"],
+            logical_date=None,

Review Comment:
   same here



##########
providers/google/src/airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -2370,20 +2370,13 @@ def execute(self, context: Any):
         if self.project_id is None:
             self.project_id = hook.project_id
 
-        # Handle missing logical_date. Example: asset-triggered DAGs (Airflow 
3)
-        logical_date = context.get("logical_date")
-        if logical_date is None:
-            # Use dag_run.run_after as fallback when logical_date is not 
available
-            dag_run = context.get("dag_run")
-            if dag_run and hasattr(dag_run, "run_after"):
-                logical_date = dag_run.run_after
-
         self.job_id = hook.generate_job_id(
             job_id=self.job_id,
             dag_id=self.dag_id,
             task_id=self.task_id,
-            logical_date=logical_date,
+            logical_date=None,

Review Comment:
   Why do we set it to always None here? Should we just remove this argument 
instead?



##########
providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py:
##########
@@ -110,15 +110,22 @@ def __init__(
         self.poll_interval = poll_interval
 
     def _get_logical_dates(self, context) -> tuple[datetime, datetime]:
+        logical_date = context.get("logical_date", None)
+        if logical_date is None:
+            raise RuntimeError(
+                "logical_date is None. Please make sure the sensor is not used 
in an asset-triggered DAG. "
+                "CloudComposerDAGRunSensor was designed to be used in 
time-based scheduled DAGs only, "
+                "and asset-triggered DAGs do not have logical_date. "

Review Comment:
   ```suggestion
                   "logical_date is None. Please make sure the sensor is not 
used in an asset-triggered Dag. "
                   "CloudComposerDAGRunSensor was designed to be used in 
time-based scheduled Dags only, "
                   "and asset-triggered Dags do not have logical_date. "
   ```



##########
providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py:
##########
@@ -337,8 +337,9 @@ def execute(self, context: Context):
             job_id=self.job_id,
             dag_id=self.dag_id,
             task_id=self.task_id,
-            logical_date=context["logical_date"],
+            logical_date=None,

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to