This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 65fec59f1b3 FIX: incorrect access of logical_date in google bigquery 
operator and google workflow operator (#55110)
65fec59f1b3 is described below

commit 65fec59f1b3ec13d08274925a37a6382251b8d69
Author: 鐘翊修 <[email protected]>
AuthorDate: Wed Oct 15 21:44:49 2025 +0800

    FIX: incorrect access of logical_date in google bigquery operator and 
google workflow operator (#55110)
    
    Co-authored-by: Wei Lee <[email protected]>
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 .../providers/apache/hive/operators/hive.py        | 10 ++++-
 .../providers/google/cloud/hooks/bigquery.py       | 45 ++++++++++++++++++++--
 .../providers/google/cloud/operators/bigquery.py   | 11 +-----
 .../providers/google/cloud/operators/workflows.py  | 23 ++++++++---
 .../google/cloud/sensors/cloud_composer.py         | 15 ++++++--
 .../google/cloud/transfers/bigquery_to_gcs.py      |  3 +-
 .../google/cloud/transfers/gcs_to_bigquery.py      |  3 +-
 .../tests/unit/google/cloud/hooks/test_bigquery.py | 21 +++++++++-
 .../unit/google/cloud/operators/test_bigquery.py   |  1 +
 .../unit/google/cloud/operators/test_workflows.py  | 37 ++++++++++++++++++
 .../google/cloud/transfers/test_gcs_to_bigquery.py |  4 +-
 11 files changed, 144 insertions(+), 29 deletions(-)

diff --git 
a/providers/apache/hive/src/airflow/providers/apache/hive/operators/hive.py 
b/providers/apache/hive/src/airflow/providers/apache/hive/operators/hive.py
index 75e562b2d46..1880a7ad2e5 100644
--- a/providers/apache/hive/src/airflow/providers/apache/hive/operators/hive.py
+++ b/providers/apache/hive/src/airflow/providers/apache/hive/operators/hive.py
@@ -143,9 +143,15 @@ class HiveOperator(BaseOperator):
         # set the mapred_job_name if it's not set with dag, task, execution 
time info
         if not self.mapred_job_name:
             ti = context["ti"]
-            logical_date = context["logical_date"]
+            logical_date = context.get("logical_date", None)
             if logical_date is None:
-                raise RuntimeError("logical_date is None")
+                raise RuntimeError(
+                    "logical_date is not available. Please make sure the task 
is not used in an asset-triggered Dag. "
+                    "HiveOperator was designed to work with timetable 
scheduled Dags, "
+                    "and an asset-triggered Dag run does not have a 
logical_date. "
+                    "If you need to use HiveOperator in an asset-triggered 
Dag,"
+                    "please open an issue on the Airflow project."
+                )
             hostname = ti.hostname or ""
             self.hook.mapred_job_name = self.mapred_job_name_template.format(
                 dag_id=ti.dag_id,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index 6a4345bfe2e..9e08dc18d85 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -26,11 +26,13 @@ import logging
 import re
 import time
 import uuid
+import warnings
 from collections.abc import Iterable, Mapping, Sequence
 from copy import deepcopy
 from datetime import datetime, timedelta
 from typing import TYPE_CHECKING, Any, Literal, NoReturn, cast, overload
 
+import pendulum
 from aiohttp import ClientSession as ClientSession
 from gcloud.aio.bigquery import Job, Table as Table_async
 from google.cloud.bigquery import (
@@ -75,6 +77,7 @@ from airflow.providers.google.common.hooks.base_google import 
(
     GoogleBaseHook,
     get_field,
 )
+from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.utils.hashlib_wrapper import md5
 from airflow.utils.helpers import convert_camel_to_snake
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -86,6 +89,8 @@ if TYPE_CHECKING:
     from google.api_core.retry import Retry
     from requests import Session
 
+    from airflow.sdk import Context
+
 log = logging.getLogger(__name__)
 
 BigQueryJob = CopyJob | QueryJob | LoadJob | ExtractJob
@@ -1274,7 +1279,16 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
             job_api_repr.result(timeout=timeout, retry=retry)
         return job_api_repr
 
-    def generate_job_id(self, job_id, dag_id, task_id, logical_date, 
configuration, force_rerun=False) -> str:
+    def generate_job_id(
+        self,
+        job_id: str | None,
+        dag_id: str,
+        task_id: str,
+        logical_date: datetime | None,
+        configuration: dict,
+        run_after: pendulum.DateTime | None = None,
+        force_rerun: bool = False,
+    ) -> str:
         if force_rerun:
             hash_base = str(uuid.uuid4())
         else:
@@ -1285,10 +1299,35 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         if job_id:
             return f"{job_id}_{uniqueness_suffix}"
 
-        exec_date = logical_date.isoformat()
-        job_id = f"airflow_{dag_id}_{task_id}_{exec_date}_{uniqueness_suffix}"
+        if logical_date is not None:
+            if AIRFLOW_V_3_0_PLUS:
+                warnings.warn(
+                    "The 'logical_date' parameter is deprecated. Please use 
'run_after' instead.",
+                    AirflowProviderDeprecationWarning,
+                    stacklevel=1,
+                )
+            job_id_timestamp = logical_date
+        elif run_after is not None:
+            job_id_timestamp = run_after
+        else:
+            job_id_timestamp = pendulum.now("UTC")
+
+        job_id = 
f"airflow_{dag_id}_{task_id}_{job_id_timestamp.isoformat()}_{uniqueness_suffix}"
         return re.sub(r"[:\-+.]", "_", job_id)
 
+    def get_run_after_or_logical_date(self, context: Context) -> 
pendulum.DateTime:
+        if AIRFLOW_V_3_0_PLUS:
+            if dag_run := context.get("dag_run"):
+                run_after = pendulum.instance(dag_run.run_after)
+            else:
+                run_after = pendulum.now("UTC")
+        else:
+            if logical_date := context.get("logical_date"):
+                run_after = logical_date
+            else:
+                run_after = pendulum.now("UTC")
+        return run_after
+
     def split_tablename(
         self, table_input: str, default_project_id: str, var_name: str | None 
= None
     ) -> tuple[str, str, str]:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
index d09a7c9f6a4..a928145db8d 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -2370,20 +2370,13 @@ class 
BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryInsertJobOpera
         if self.project_id is None:
             self.project_id = hook.project_id
 
-        # Handle missing logical_date. Example: asset-triggered DAGs (Airflow 
3)
-        logical_date = context.get("logical_date")
-        if logical_date is None:
-            # Use dag_run.run_after as fallback when logical_date is not 
available
-            dag_run = context.get("dag_run")
-            if dag_run and hasattr(dag_run, "run_after"):
-                logical_date = dag_run.run_after
-
         self.job_id = hook.generate_job_id(
             job_id=self.job_id,
             dag_id=self.dag_id,
             task_id=self.task_id,
-            logical_date=logical_date,
+            logical_date=None,
             configuration=self.configuration,
+            run_after=hook.get_run_after_or_logical_date(context),
             force_rerun=self.force_rerun,
         )
 
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/workflows.py 
b/providers/google/src/airflow/providers/google/cloud/operators/workflows.py
index 51da294c664..5440afb8d28 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/workflows.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/workflows.py
@@ -20,9 +20,10 @@ import datetime
 import json
 import re
 import uuid
-from collections.abc import Sequence
+from collections.abc import Collection, Sequence
 from typing import TYPE_CHECKING
 
+import pendulum
 from google.api_core.exceptions import AlreadyExists
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.cloud.workflows.executions_v1beta import Execution
@@ -36,12 +37,13 @@ from airflow.providers.google.cloud.links.workflows import (
 )
 from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
 from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID
+from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS
 
 if TYPE_CHECKING:
     from google.api_core.retry import Retry
     from google.protobuf.field_mask_pb2 import FieldMask
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 from airflow.utils.hashlib_wrapper import md5
 
@@ -69,7 +71,7 @@ class 
WorkflowsCreateWorkflowOperator(GoogleCloudBaseOperator):
     :param metadata: Additional metadata that is provided to the method.
     """
 
-    template_fields: Sequence[str] = ("location", "workflow", "workflow_id")
+    template_fields: Collection[str] = ("location", "workflow", "workflow_id")
     template_fields_renderers = {"workflow": "json"}
     operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
 
@@ -101,7 +103,7 @@ class 
WorkflowsCreateWorkflowOperator(GoogleCloudBaseOperator):
         self.impersonation_chain = impersonation_chain
         self.force_rerun = force_rerun
 
-    def _workflow_id(self, context):
+    def _workflow_id(self, context: Context) -> str:
         if self.workflow_id and not self.force_rerun:
             # If users provide workflow id then assuring the idempotency
             # is on their side
@@ -114,8 +116,17 @@ class 
WorkflowsCreateWorkflowOperator(GoogleCloudBaseOperator):
 
         # We are limited by allowed length of workflow_id so
         # we use hash of whole information
-        exec_date = context["logical_date"].isoformat()
-        base = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{hash_base}"
+        if AIRFLOW_V_3_0_PLUS:
+            if dag_run := context.get("dag_run"):
+                run_after = pendulum.instance(dag_run.run_after)
+            else:
+                run_after = pendulum.now("UTC")
+        else:
+            if logical_date := context.get("logical_date"):
+                run_after = pendulum.instance(logical_date)
+            else:
+                run_after = pendulum.now("UTC")
+        base = 
f"airflow_{self.dag_id}_{self.task_id}_{run_after.isoformat()}_{hash_base}"
         workflow_id = md5(base.encode()).hexdigest()
         return re.sub(r"[:\-+.]", "_", workflow_id)
 
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
index 242953dd2ef..daf6a05161e 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -121,15 +121,22 @@ class CloudComposerDAGRunSensor(BaseSensorOperator):
             )
 
     def _get_logical_dates(self, context) -> tuple[datetime, datetime]:
+        logical_date = context.get("logical_date", None)
+        if logical_date is None:
+            raise RuntimeError(
+                "logical_date is None. Please make sure the sensor is not used 
in an asset-triggered Dag. "
+                "CloudComposerDAGRunSensor was designed to be used in 
time-based scheduled Dags only, "
+                "and asset-triggered Dags do not have logical_date. "
+            )
         if isinstance(self.execution_range, timedelta):
             if self.execution_range < timedelta(0):
-                return context["logical_date"], context["logical_date"] - 
self.execution_range
-            return context["logical_date"] - self.execution_range, 
context["logical_date"]
+                return logical_date, logical_date - self.execution_range
+            return logical_date - self.execution_range, logical_date
         if isinstance(self.execution_range, list) and 
len(self.execution_range) > 0:
             return self.execution_range[0], self.execution_range[1] if len(
                 self.execution_range
-            ) > 1 else context["logical_date"]
-        return context["logical_date"] - timedelta(1), context["logical_date"]
+            ) > 1 else logical_date
+        return logical_date - timedelta(1), logical_date
 
     def poke(self, context: Context) -> bool:
         start_date, end_date = self._get_logical_dates(context)
diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
 
b/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index 719d565a9fc..20f0d1946f9 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -215,8 +215,9 @@ class BigQueryToGCSOperator(BaseOperator):
             job_id=self.job_id,
             dag_id=self.dag_id,
             task_id=self.task_id,
-            logical_date=context["logical_date"],
+            logical_date=None,
             configuration=configuration,
+            run_after=hook.get_run_after_or_logical_date(context),
             force_rerun=self.force_rerun,
         )
 
diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
 
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index cf57ddf6b4f..f4aed63542d 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -346,8 +346,9 @@ class GCSToBigQueryOperator(BaseOperator):
             job_id=self.job_id,
             dag_id=self.dag_id,
             task_id=self.task_id,
-            logical_date=context["logical_date"],
+            logical_date=None,
             configuration=self.configuration,
+            run_after=hook.get_run_after_or_logical_date(context),
             force_rerun=self.force_rerun,
         )
 
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index 9267497af30..999e32a35ec 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -22,6 +22,7 @@ from unittest import mock
 from unittest.mock import AsyncMock
 
 import google.auth
+import pendulum
 import pytest
 from gcloud.aio.bigquery import Job, Table as Table_async
 from google.api_core import page_iterator
@@ -51,6 +52,13 @@ from airflow.providers.google.cloud.hooks.bigquery import (
     _validate_value,
 )
 
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import Context
+else:
+    from airflow.utils.context import Context
+
 pytestmark = 
pytest.mark.filterwarnings("error::airflow.exceptions.AirflowProviderDeprecationWarning")
 
 PROJECT_ID = "bq-project"
@@ -673,11 +681,22 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
             job_id=None,
             dag_id=test_dag_id,
             task_id="test_job_id",
-            logical_date=datetime(2020, 1, 23),
+            logical_date=None,
             configuration=configuration,
+            run_after=datetime(2020, 1, 23),
         )
         assert job_id == expected_job_id
 
+    def test_get_run_after_or_logical_date(self):
+        if AIRFLOW_V_3_0_PLUS:
+            from airflow.models import DagRun
+
+            ctx = Context(dag_run=DagRun(run_after=pendulum.datetime(2025, 1, 
1)))
+        else:
+            ctx = Context(logical_date=pendulum.datetime(2025, 1, 1))
+
+        assert self.hook.get_run_after_or_logical_date(ctx) == 
pendulum.datetime(2025, 1, 1)
+
     @mock.patch(
         "airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_job",
         return_value=mock.MagicMock(spec=CopyJob),
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
index 53095f21162..ba1ac6f6fab 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
@@ -1596,6 +1596,7 @@ class TestBigQueryInsertJobOperator:
             dag_id="adhoc_airflow",
             task_id="insert_query_job",
             logical_date=ANY,
+            run_after=ANY,
             configuration=configuration,
             force_rerun=True,
         )
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_workflows.py 
b/providers/google/tests/unit/google/cloud/operators/test_workflows.py
index 7a48e2a8acb..286fd25fa57 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_workflows.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_workflows.py
@@ -17,10 +17,14 @@
 from __future__ import annotations
 
 import datetime
+import json
+import re
 from unittest import mock
 
+import pendulum
 from google.protobuf.timestamp_pb2 import Timestamp
 
+from airflow.models.dagrun import DagRun
 from airflow.providers.google.cloud.operators.workflows import (
     WorkflowsCancelExecutionOperator,
     WorkflowsCreateExecutionOperator,
@@ -32,6 +36,14 @@ from airflow.providers.google.cloud.operators.workflows 
import (
     WorkflowsListWorkflowsOperator,
     WorkflowsUpdateWorkflowOperator,
 )
+from airflow.utils.hashlib_wrapper import md5
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import Context
+else:
+    from airflow.utils.context import Context
 
 BASE_PATH = "airflow.providers.google.cloud.operators.workflows.{}"
 LOCATION = "europe-west1"
@@ -86,6 +98,31 @@ class TestWorkflowsCreateWorkflowOperator:
 
         assert result == mock_object.to_dict.return_value
 
+    def test_execute_without_workflow_id(self):
+        op = WorkflowsCreateWorkflowOperator(
+            task_id="test_task",
+            workflow=WORKFLOW,
+            workflow_id="",
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hash_base = json.dumps(WORKFLOW, sort_keys=True)
+        date = pendulum.datetime(2025, 1, 1)
+        ctx = Context(logical_date=date)
+        if AIRFLOW_V_3_0_PLUS:
+            ctx["dag_run"] = DagRun(run_after=date)
+        expected = 
md5(f"airflow_{op.dag_id}_test_task_{date.isoformat()}_{hash_base}".encode()).hexdigest()
+        assert op._workflow_id(ctx) == re.sub(r"[:\-+.]", "_", expected)
+
+        if AIRFLOW_V_3_0_PLUS:
+            ctx = Context(dag_run=DagRun(run_after=date))
+            assert op._workflow_id(ctx) == re.sub(r"[:\-+.]", "_", expected)
+
 
 class TestWorkflowsUpdateWorkflowOperator:
     @mock.patch(BASE_PATH.format("Workflow"))
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py 
b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
index ee2cc8ff2c2..ad6f3a08a6f 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -43,7 +43,6 @@ from airflow.providers.common.compat.openlineage.facet import 
(
 )
 from airflow.providers.google.cloud.transfers.gcs_to_bigquery import 
GCSToBigQueryOperator
 from airflow.utils.state import TaskInstanceState
-from airflow.utils.timezone import datetime
 
 TASK_ID = "test-gcs-to-bq-operator"
 TEST_EXPLICIT_DEST = "test-project.dataset.table"
@@ -1771,7 +1770,8 @@ class TestAsyncGCSToBigQueryOperator:
             job_id=None,
             dag_id="adhoc_airflow",
             task_id=TASK_ID,
-            logical_date=datetime(2016, 1, 1, 0, 0),
+            logical_date=None,
+            run_after=hook.return_value.get_run_after_or_logical_date(),
             configuration={},
             force_rerun=True,
         )

Reply via email to