This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dc0e38a8bd8 Make google provider compatible with mypy 1.16.1 (#53148)
dc0e38a8bd8 is described below
commit dc0e38a8bd89ffdf127db0fcef2461573b062085
Author: GPK <[email protected]>
AuthorDate: Thu Jul 10 19:40:46 2025 +0100
Make google provider compatible with mypy 1.16.1 (#53148)
* Fix mypy gcp
* Make google provider compatible with mypy 1.16.1
---
.../google/src/airflow/providers/google/cloud/hooks/bigquery.py | 2 +-
.../google/src/airflow/providers/google/cloud/hooks/cloud_sql.py | 4 ++++
.../google/src/airflow/providers/google/cloud/hooks/datafusion.py | 2 +-
.../src/airflow/providers/google/cloud/links/kubernetes_engine.py | 3 +++
.../google/src/airflow/providers/google/cloud/operators/bigquery.py | 6 ++++--
.../google/src/airflow/providers/google/cloud/sensors/datafusion.py | 4 ++--
.../google/src/airflow/providers/google/cloud/triggers/bigquery.py | 6 +++---
7 files changed, 18 insertions(+), 9 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index a0f635c584a..7faa955dc70 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -2248,7 +2248,7 @@ class BigQueryAsyncHook(GoogleBaseAsyncHook):
self,
sql: str,
pass_value: Any,
- records: list[Any],
+ records: list[Any] | None = None,
tolerance: float | None = None,
) -> None:
"""
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
index 1d8ac9e91d7..72a526e8c31 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
@@ -1103,6 +1103,8 @@ class CloudSQLDatabaseHook(BaseHook):
return connection_uri
def _get_instance_socket_name(self) -> str:
+ if self.project_id is None:
+ raise ValueError("The project_id should not be none")
return self.project_id + ":" + self.location + ":" + self.instance
def _get_sqlproxy_instance_specification(self) -> str:
@@ -1135,6 +1137,8 @@ class CloudSQLDatabaseHook(BaseHook):
raise ValueError("Proxy runner can only be retrieved in case of
use_proxy = True")
if not self.sql_proxy_unique_path:
raise ValueError("The sql_proxy_unique_path should be set")
+ if self.project_id is None:
+ raise ValueError("The project_id should not be None")
return CloudSqlProxyRunner(
path_prefix=self.sql_proxy_unique_path,
instance_specification=self._get_sqlproxy_instance_specification(),
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
index c228b3fa7c0..c27f63eb839 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
@@ -434,7 +434,7 @@ class DataFusionHook(GoogleBaseHook):
pipeline_id: str,
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
namespace: str = "default",
- ) -> Any:
+ ) -> dict:
url = os.path.join(
self._base_url(instance_url, namespace),
quote(pipeline_name),
diff --git
a/providers/google/src/airflow/providers/google/cloud/links/kubernetes_engine.py
b/providers/google/src/airflow/providers/google/cloud/links/kubernetes_engine.py
index 2b94326cbba..095bb0474a9 100644
---
a/providers/google/src/airflow/providers/google/cloud/links/kubernetes_engine.py
+++
b/providers/google/src/airflow/providers/google/cloud/links/kubernetes_engine.py
@@ -57,6 +57,9 @@ class KubernetesEngineClusterLink(BaseGoogleLink):
if isinstance(cluster, dict):
cluster = Cluster.from_json(json.dumps(cluster))
+ if not cluster:
+ raise ValueError("Cluster must be provided for
KubernetesEngineClusterLink.")
+
super().persist(
context=context,
cluster_name=cluster.name,
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
index b245c2d4bf5..30d708a3c60 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -116,8 +116,10 @@ class _BigQueryDbHookMixin:
impersonation_chain=self.impersonation_chain,
labels=self.labels,
)
+
+ # mypy assuming project_id is read only, as project_id is a property
in GoogleBaseHook.
if self.project_id:
- hook.project_id = self.project_id
+ hook.project_id = self.project_id # type:ignore[misc]
return hook
@@ -1156,7 +1158,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator,
_BigQueryOperatorsEncrypt
"BigQueryHook.list_rows() returns iterator when
return_iterator is False (default)"
)
self.log.info("Total extracted rows: %s", len(rows))
-
+ table_data: list[dict[str, Any]] | list[Any]
if self.as_dict:
table_data = [dict(row) for row in rows]
else:
diff --git
a/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py
b/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py
index ea52f9af750..2f06c1144b9 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py
@@ -115,7 +115,7 @@ class
CloudDataFusionPipelineStateSensor(BaseSensorOperator):
pipeline_id=self.pipeline_id,
namespace=self.namespace,
)
- pipeline_status = pipeline_workflow["status"]
+ pipeline_status = pipeline_workflow.get("status")
except AirflowNotFoundException:
message = "Specified Pipeline ID was not found."
raise AirflowException(message)
@@ -132,4 +132,4 @@ class
CloudDataFusionPipelineStateSensor(BaseSensorOperator):
self.log.debug(
"Current status of the pipeline workflow for %s: %s.",
self.pipeline_id, pipeline_status
)
- return pipeline_status in self.expected_statuses
+ return pipeline_status is not None and pipeline_status in
self.expected_statuses
diff --git
a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
index 2cac99543a6..b0ae15935b7 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
@@ -591,9 +591,9 @@ class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
if response_from_hook["status"] == "success":
query_results = await
hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
records = hook.get_records(query_results)
- records = records.pop(0) if records else None
- hook.value_check(self.sql, self.pass_value, records,
self.tolerance)
- yield TriggerEvent({"status": "success", "message": "Job
completed", "records": records})
+ _records = records.pop(0) if records else None
+ hook.value_check(self.sql, self.pass_value, _records,
self.tolerance)
+ yield TriggerEvent({"status": "success", "message": "Job
completed", "records": _records})
return
elif response_from_hook["status"] == "pending":
self.log.info("Query is still running...")