This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dc0e38a8bd8 Make google provider compatible with mypy 1.16.1 (#53148)
dc0e38a8bd8 is described below

commit dc0e38a8bd89ffdf127db0fcef2461573b062085
Author: GPK <[email protected]>
AuthorDate: Thu Jul 10 19:40:46 2025 +0100

    Make google provider compatible with mypy 1.16.1 (#53148)
    
    * Fix mypy gcp
    
    * Make google provider compatible with mypy 1.16.1
---
 .../google/src/airflow/providers/google/cloud/hooks/bigquery.py     | 2 +-
 .../google/src/airflow/providers/google/cloud/hooks/cloud_sql.py    | 4 ++++
 .../google/src/airflow/providers/google/cloud/hooks/datafusion.py   | 2 +-
 .../src/airflow/providers/google/cloud/links/kubernetes_engine.py   | 3 +++
 .../google/src/airflow/providers/google/cloud/operators/bigquery.py | 6 ++++--
 .../google/src/airflow/providers/google/cloud/sensors/datafusion.py | 4 ++--
 .../google/src/airflow/providers/google/cloud/triggers/bigquery.py  | 6 +++---
 7 files changed, 18 insertions(+), 9 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index a0f635c584a..7faa955dc70 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -2248,7 +2248,7 @@ class BigQueryAsyncHook(GoogleBaseAsyncHook):
         self,
         sql: str,
         pass_value: Any,
-        records: list[Any],
+        records: list[Any] | None = None,
         tolerance: float | None = None,
     ) -> None:
         """
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
index 1d8ac9e91d7..72a526e8c31 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
@@ -1103,6 +1103,8 @@ class CloudSQLDatabaseHook(BaseHook):
         return connection_uri
 
     def _get_instance_socket_name(self) -> str:
+        if self.project_id is None:
+            raise ValueError("The project_id should not be none")
         return self.project_id + ":" + self.location + ":" + self.instance
 
     def _get_sqlproxy_instance_specification(self) -> str:
@@ -1135,6 +1137,8 @@ class CloudSQLDatabaseHook(BaseHook):
             raise ValueError("Proxy runner can only be retrieved in case of 
use_proxy = True")
         if not self.sql_proxy_unique_path:
             raise ValueError("The sql_proxy_unique_path should be set")
+        if self.project_id is None:
+            raise ValueError("The project_id should not be None")
         return CloudSqlProxyRunner(
             path_prefix=self.sql_proxy_unique_path,
             instance_specification=self._get_sqlproxy_instance_specification(),
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
index c228b3fa7c0..c27f63eb839 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
@@ -434,7 +434,7 @@ class DataFusionHook(GoogleBaseHook):
         pipeline_id: str,
         pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
         namespace: str = "default",
-    ) -> Any:
+    ) -> dict:
         url = os.path.join(
             self._base_url(instance_url, namespace),
             quote(pipeline_name),
diff --git 
a/providers/google/src/airflow/providers/google/cloud/links/kubernetes_engine.py
 
b/providers/google/src/airflow/providers/google/cloud/links/kubernetes_engine.py
index 2b94326cbba..095bb0474a9 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/links/kubernetes_engine.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/links/kubernetes_engine.py
@@ -57,6 +57,9 @@ class KubernetesEngineClusterLink(BaseGoogleLink):
         if isinstance(cluster, dict):
             cluster = Cluster.from_json(json.dumps(cluster))
 
+        if not cluster:
+            raise ValueError("Cluster must be provided for 
KubernetesEngineClusterLink.")
+
         super().persist(
             context=context,
             cluster_name=cluster.name,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
index b245c2d4bf5..30d708a3c60 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -116,8 +116,10 @@ class _BigQueryDbHookMixin:
             impersonation_chain=self.impersonation_chain,
             labels=self.labels,
         )
+
+        # mypy assuming project_id is read only, as project_id is a property 
in GoogleBaseHook.
         if self.project_id:
-            hook.project_id = self.project_id
+            hook.project_id = self.project_id  # type:ignore[misc]
         return hook
 
 
@@ -1156,7 +1158,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator, 
_BigQueryOperatorsEncrypt
                     "BigQueryHook.list_rows() returns iterator when 
return_iterator is False (default)"
                 )
             self.log.info("Total extracted rows: %s", len(rows))
-
+            table_data: list[dict[str, Any]] | list[Any]
             if self.as_dict:
                 table_data = [dict(row) for row in rows]
             else:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py
index ea52f9af750..2f06c1144b9 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py
@@ -115,7 +115,7 @@ class 
CloudDataFusionPipelineStateSensor(BaseSensorOperator):
                 pipeline_id=self.pipeline_id,
                 namespace=self.namespace,
             )
-            pipeline_status = pipeline_workflow["status"]
+            pipeline_status = pipeline_workflow.get("status")
         except AirflowNotFoundException:
             message = "Specified Pipeline ID was not found."
             raise AirflowException(message)
@@ -132,4 +132,4 @@ class 
CloudDataFusionPipelineStateSensor(BaseSensorOperator):
         self.log.debug(
             "Current status of the pipeline workflow for %s: %s.", 
self.pipeline_id, pipeline_status
         )
-        return pipeline_status in self.expected_statuses
+        return pipeline_status is not None and pipeline_status in 
self.expected_statuses
diff --git 
a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
index 2cac99543a6..b0ae15935b7 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py
@@ -591,9 +591,9 @@ class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
                 if response_from_hook["status"] == "success":
                     query_results = await 
hook.get_job_output(job_id=self.job_id, project_id=self.project_id)
                     records = hook.get_records(query_results)
-                    records = records.pop(0) if records else None
-                    hook.value_check(self.sql, self.pass_value, records, 
self.tolerance)
-                    yield TriggerEvent({"status": "success", "message": "Job 
completed", "records": records})
+                    _records = records.pop(0) if records else None
+                    hook.value_check(self.sql, self.pass_value, _records, 
self.tolerance)
+                    yield TriggerEvent({"status": "success", "message": "Job 
completed", "records": _records})
                     return
                 elif response_from_hook["status"] == "pending":
                     self.log.info("Query is still running...")

Reply via email to