This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 65fec59f1b3 FIX: incorrect access of logical_date in google bigquery
operator and google workflow operator (#55110)
65fec59f1b3 is described below
commit 65fec59f1b3ec13d08274925a37a6382251b8d69
Author: 鐘翊修 <[email protected]>
AuthorDate: Wed Oct 15 21:44:49 2025 +0800
FIX: incorrect access of logical_date in google bigquery operator and
google workflow operator (#55110)
Co-authored-by: Wei Lee <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
---
.../providers/apache/hive/operators/hive.py | 10 ++++-
.../providers/google/cloud/hooks/bigquery.py | 45 ++++++++++++++++++++--
.../providers/google/cloud/operators/bigquery.py | 11 +-----
.../providers/google/cloud/operators/workflows.py | 23 ++++++++---
.../google/cloud/sensors/cloud_composer.py | 15 ++++++--
.../google/cloud/transfers/bigquery_to_gcs.py | 3 +-
.../google/cloud/transfers/gcs_to_bigquery.py | 3 +-
.../tests/unit/google/cloud/hooks/test_bigquery.py | 21 +++++++++-
.../unit/google/cloud/operators/test_bigquery.py | 1 +
.../unit/google/cloud/operators/test_workflows.py | 37 ++++++++++++++++++
.../google/cloud/transfers/test_gcs_to_bigquery.py | 4 +-
11 files changed, 144 insertions(+), 29 deletions(-)
diff --git
a/providers/apache/hive/src/airflow/providers/apache/hive/operators/hive.py
b/providers/apache/hive/src/airflow/providers/apache/hive/operators/hive.py
index 75e562b2d46..1880a7ad2e5 100644
--- a/providers/apache/hive/src/airflow/providers/apache/hive/operators/hive.py
+++ b/providers/apache/hive/src/airflow/providers/apache/hive/operators/hive.py
@@ -143,9 +143,15 @@ class HiveOperator(BaseOperator):
# set the mapred_job_name if it's not set with dag, task, execution
time info
if not self.mapred_job_name:
ti = context["ti"]
- logical_date = context["logical_date"]
+ logical_date = context.get("logical_date", None)
if logical_date is None:
- raise RuntimeError("logical_date is None")
+ raise RuntimeError(
+ "logical_date is not available. Please make sure the task
is not used in an asset-triggered Dag. "
+ "HiveOperator was designed to work with timetable
scheduled Dags, "
+ "and an asset-triggered Dag run does not have a
logical_date. "
+ "If you need to use HiveOperator in an asset-triggered
Dag,"
+ "please open an issue on the Airflow project."
+ )
hostname = ti.hostname or ""
self.hook.mapred_job_name = self.mapred_job_name_template.format(
dag_id=ti.dag_id,
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index 6a4345bfe2e..9e08dc18d85 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -26,11 +26,13 @@ import logging
import re
import time
import uuid
+import warnings
from collections.abc import Iterable, Mapping, Sequence
from copy import deepcopy
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Literal, NoReturn, cast, overload
+import pendulum
from aiohttp import ClientSession as ClientSession
from gcloud.aio.bigquery import Job, Table as Table_async
from google.cloud.bigquery import (
@@ -75,6 +77,7 @@ from airflow.providers.google.common.hooks.base_google import
(
GoogleBaseHook,
get_field,
)
+from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -86,6 +89,8 @@ if TYPE_CHECKING:
from google.api_core.retry import Retry
from requests import Session
+ from airflow.sdk import Context
+
log = logging.getLogger(__name__)
BigQueryJob = CopyJob | QueryJob | LoadJob | ExtractJob
@@ -1274,7 +1279,16 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
job_api_repr.result(timeout=timeout, retry=retry)
return job_api_repr
- def generate_job_id(self, job_id, dag_id, task_id, logical_date,
configuration, force_rerun=False) -> str:
+ def generate_job_id(
+ self,
+ job_id: str | None,
+ dag_id: str,
+ task_id: str,
+ logical_date: datetime | None,
+ configuration: dict,
+ run_after: pendulum.DateTime | None = None,
+ force_rerun: bool = False,
+ ) -> str:
if force_rerun:
hash_base = str(uuid.uuid4())
else:
@@ -1285,10 +1299,35 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
if job_id:
return f"{job_id}_{uniqueness_suffix}"
- exec_date = logical_date.isoformat()
- job_id = f"airflow_{dag_id}_{task_id}_{exec_date}_{uniqueness_suffix}"
+ if logical_date is not None:
+ if AIRFLOW_V_3_0_PLUS:
+ warnings.warn(
+ "The 'logical_date' parameter is deprecated. Please use
'run_after' instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
+ job_id_timestamp = logical_date
+ elif run_after is not None:
+ job_id_timestamp = run_after
+ else:
+ job_id_timestamp = pendulum.now("UTC")
+
+ job_id =
f"airflow_{dag_id}_{task_id}_{job_id_timestamp.isoformat()}_{uniqueness_suffix}"
return re.sub(r"[:\-+.]", "_", job_id)
+ def get_run_after_or_logical_date(self, context: Context) ->
pendulum.DateTime:
+ if AIRFLOW_V_3_0_PLUS:
+ if dag_run := context.get("dag_run"):
+ run_after = pendulum.instance(dag_run.run_after)
+ else:
+ run_after = pendulum.now("UTC")
+ else:
+ if logical_date := context.get("logical_date"):
+ run_after = logical_date
+ else:
+ run_after = pendulum.now("UTC")
+ return run_after
+
def split_tablename(
self, table_input: str, default_project_id: str, var_name: str | None
= None
) -> tuple[str, str, str]:
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
index d09a7c9f6a4..a928145db8d 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -2370,20 +2370,13 @@ class
BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryInsertJobOpera
if self.project_id is None:
self.project_id = hook.project_id
- # Handle missing logical_date. Example: asset-triggered DAGs (Airflow
3)
- logical_date = context.get("logical_date")
- if logical_date is None:
- # Use dag_run.run_after as fallback when logical_date is not
available
- dag_run = context.get("dag_run")
- if dag_run and hasattr(dag_run, "run_after"):
- logical_date = dag_run.run_after
-
self.job_id = hook.generate_job_id(
job_id=self.job_id,
dag_id=self.dag_id,
task_id=self.task_id,
- logical_date=logical_date,
+ logical_date=None,
configuration=self.configuration,
+ run_after=hook.get_run_after_or_logical_date(context),
force_rerun=self.force_rerun,
)
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/workflows.py
b/providers/google/src/airflow/providers/google/cloud/operators/workflows.py
index 51da294c664..5440afb8d28 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/workflows.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/workflows.py
@@ -20,9 +20,10 @@ import datetime
import json
import re
import uuid
-from collections.abc import Sequence
+from collections.abc import Collection, Sequence
from typing import TYPE_CHECKING
+import pendulum
from google.api_core.exceptions import AlreadyExists
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.cloud.workflows.executions_v1beta import Execution
@@ -36,12 +37,13 @@ from airflow.providers.google.cloud.links.workflows import (
)
from airflow.providers.google.cloud.operators.cloud_base import
GoogleCloudBaseOperator
from airflow.providers.google.common.hooks.base_google import
PROVIDE_PROJECT_ID
+from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS
if TYPE_CHECKING:
from google.api_core.retry import Retry
from google.protobuf.field_mask_pb2 import FieldMask
- from airflow.utils.context import Context
+ from airflow.sdk import Context
from airflow.utils.hashlib_wrapper import md5
@@ -69,7 +71,7 @@ class
WorkflowsCreateWorkflowOperator(GoogleCloudBaseOperator):
:param metadata: Additional metadata that is provided to the method.
"""
- template_fields: Sequence[str] = ("location", "workflow", "workflow_id")
+ template_fields: Collection[str] = ("location", "workflow", "workflow_id")
template_fields_renderers = {"workflow": "json"}
operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
@@ -101,7 +103,7 @@ class
WorkflowsCreateWorkflowOperator(GoogleCloudBaseOperator):
self.impersonation_chain = impersonation_chain
self.force_rerun = force_rerun
- def _workflow_id(self, context):
+ def _workflow_id(self, context: Context) -> str:
if self.workflow_id and not self.force_rerun:
# If users provide workflow id then assuring the idempotency
# is on their side
@@ -114,8 +116,17 @@ class
WorkflowsCreateWorkflowOperator(GoogleCloudBaseOperator):
# We are limited by allowed length of workflow_id so
# we use hash of whole information
- exec_date = context["logical_date"].isoformat()
- base = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{hash_base}"
+ if AIRFLOW_V_3_0_PLUS:
+ if dag_run := context.get("dag_run"):
+ run_after = pendulum.instance(dag_run.run_after)
+ else:
+ run_after = pendulum.now("UTC")
+ else:
+ if logical_date := context.get("logical_date"):
+ run_after = pendulum.instance(logical_date)
+ else:
+ run_after = pendulum.now("UTC")
+ base =
f"airflow_{self.dag_id}_{self.task_id}_{run_after.isoformat()}_{hash_base}"
workflow_id = md5(base.encode()).hexdigest()
return re.sub(r"[:\-+.]", "_", workflow_id)
diff --git
a/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
b/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
index 242953dd2ef..daf6a05161e 100644
---
a/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
+++
b/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -121,15 +121,22 @@ class CloudComposerDAGRunSensor(BaseSensorOperator):
)
def _get_logical_dates(self, context) -> tuple[datetime, datetime]:
+ logical_date = context.get("logical_date", None)
+ if logical_date is None:
+ raise RuntimeError(
+ "logical_date is None. Please make sure the sensor is not used
in an asset-triggered Dag. "
+ "CloudComposerDAGRunSensor was designed to be used in
time-based scheduled Dags only, "
+ "and asset-triggered Dags do not have logical_date. "
+ )
if isinstance(self.execution_range, timedelta):
if self.execution_range < timedelta(0):
- return context["logical_date"], context["logical_date"] -
self.execution_range
- return context["logical_date"] - self.execution_range,
context["logical_date"]
+ return logical_date, logical_date - self.execution_range
+ return logical_date - self.execution_range, logical_date
if isinstance(self.execution_range, list) and
len(self.execution_range) > 0:
return self.execution_range[0], self.execution_range[1] if len(
self.execution_range
- ) > 1 else context["logical_date"]
- return context["logical_date"] - timedelta(1), context["logical_date"]
+ ) > 1 else logical_date
+ return logical_date - timedelta(1), logical_date
def poke(self, context: Context) -> bool:
start_date, end_date = self._get_logical_dates(context)
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
b/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index 719d565a9fc..20f0d1946f9 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -215,8 +215,9 @@ class BigQueryToGCSOperator(BaseOperator):
job_id=self.job_id,
dag_id=self.dag_id,
task_id=self.task_id,
- logical_date=context["logical_date"],
+ logical_date=None,
configuration=configuration,
+ run_after=hook.get_run_after_or_logical_date(context),
force_rerun=self.force_rerun,
)
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index cf57ddf6b4f..f4aed63542d 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -346,8 +346,9 @@ class GCSToBigQueryOperator(BaseOperator):
job_id=self.job_id,
dag_id=self.dag_id,
task_id=self.task_id,
- logical_date=context["logical_date"],
+ logical_date=None,
configuration=self.configuration,
+ run_after=hook.get_run_after_or_logical_date(context),
force_rerun=self.force_rerun,
)
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index 9267497af30..999e32a35ec 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -22,6 +22,7 @@ from unittest import mock
from unittest.mock import AsyncMock
import google.auth
+import pendulum
import pytest
from gcloud.aio.bigquery import Job, Table as Table_async
from google.api_core import page_iterator
@@ -51,6 +52,13 @@ from airflow.providers.google.cloud.hooks.bigquery import (
_validate_value,
)
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk import Context
+else:
+ from airflow.utils.context import Context
+
pytestmark =
pytest.mark.filterwarnings("error::airflow.exceptions.AirflowProviderDeprecationWarning")
PROJECT_ID = "bq-project"
@@ -673,11 +681,22 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
job_id=None,
dag_id=test_dag_id,
task_id="test_job_id",
- logical_date=datetime(2020, 1, 23),
+ logical_date=None,
configuration=configuration,
+ run_after=datetime(2020, 1, 23),
)
assert job_id == expected_job_id
+ def test_get_run_after_or_logical_date(self):
+ if AIRFLOW_V_3_0_PLUS:
+ from airflow.models import DagRun
+
+ ctx = Context(dag_run=DagRun(run_after=pendulum.datetime(2025, 1,
1)))
+ else:
+ ctx = Context(logical_date=pendulum.datetime(2025, 1, 1))
+
+ assert self.hook.get_run_after_or_logical_date(ctx) ==
pendulum.datetime(2025, 1, 1)
+
@mock.patch(
"airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_job",
return_value=mock.MagicMock(spec=CopyJob),
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
index 53095f21162..ba1ac6f6fab 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
@@ -1596,6 +1596,7 @@ class TestBigQueryInsertJobOperator:
dag_id="adhoc_airflow",
task_id="insert_query_job",
logical_date=ANY,
+ run_after=ANY,
configuration=configuration,
force_rerun=True,
)
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_workflows.py
b/providers/google/tests/unit/google/cloud/operators/test_workflows.py
index 7a48e2a8acb..286fd25fa57 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_workflows.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_workflows.py
@@ -17,10 +17,14 @@
from __future__ import annotations
import datetime
+import json
+import re
from unittest import mock
+import pendulum
from google.protobuf.timestamp_pb2 import Timestamp
+from airflow.models.dagrun import DagRun
from airflow.providers.google.cloud.operators.workflows import (
WorkflowsCancelExecutionOperator,
WorkflowsCreateExecutionOperator,
@@ -32,6 +36,14 @@ from airflow.providers.google.cloud.operators.workflows
import (
WorkflowsListWorkflowsOperator,
WorkflowsUpdateWorkflowOperator,
)
+from airflow.utils.hashlib_wrapper import md5
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk import Context
+else:
+ from airflow.utils.context import Context
BASE_PATH = "airflow.providers.google.cloud.operators.workflows.{}"
LOCATION = "europe-west1"
@@ -86,6 +98,31 @@ class TestWorkflowsCreateWorkflowOperator:
assert result == mock_object.to_dict.return_value
+ def test_execute_without_workflow_id(self):
+ op = WorkflowsCreateWorkflowOperator(
+ task_id="test_task",
+ workflow=WORKFLOW,
+ workflow_id="",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hash_base = json.dumps(WORKFLOW, sort_keys=True)
+ date = pendulum.datetime(2025, 1, 1)
+ ctx = Context(logical_date=date)
+ if AIRFLOW_V_3_0_PLUS:
+ ctx["dag_run"] = DagRun(run_after=date)
+ expected =
md5(f"airflow_{op.dag_id}_test_task_{date.isoformat()}_{hash_base}".encode()).hexdigest()
+ assert op._workflow_id(ctx) == re.sub(r"[:\-+.]", "_", expected)
+
+ if AIRFLOW_V_3_0_PLUS:
+ ctx = Context(dag_run=DagRun(run_after=date))
+ assert op._workflow_id(ctx) == re.sub(r"[:\-+.]", "_", expected)
+
class TestWorkflowsUpdateWorkflowOperator:
@mock.patch(BASE_PATH.format("Workflow"))
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
index ee2cc8ff2c2..ad6f3a08a6f 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -43,7 +43,6 @@ from airflow.providers.common.compat.openlineage.facet import
(
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import
GCSToBigQueryOperator
from airflow.utils.state import TaskInstanceState
-from airflow.utils.timezone import datetime
TASK_ID = "test-gcs-to-bq-operator"
TEST_EXPLICIT_DEST = "test-project.dataset.table"
@@ -1771,7 +1770,8 @@ class TestAsyncGCSToBigQueryOperator:
job_id=None,
dag_id="adhoc_airflow",
task_id=TASK_ID,
- logical_date=datetime(2016, 1, 1, 0, 0),
+ logical_date=None,
+ run_after=hook.return_value.get_run_after_or_logical_date(),
configuration={},
force_rerun=True,
)