This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 064fc2b775 Make pyodbc.Row and databricks.Row JSON-serializable via
new `make_serializable` method (#32319)
064fc2b775 is described below
commit 064fc2b7751a44e37ccce97609cff7c496098e56
Author: Joffrey Bienvenu <[email protected]>
AuthorDate: Fri Nov 17 01:16:36 2023 +0100
Make pyodbc.Row and databricks.Row JSON-serializable via new
`make_serializable` method (#32319)
---
.pre-commit-config.yaml | 6 +
STATIC_CODE_CHECKS.rst | 2 +
airflow/providers/common/sql/CHANGELOG.rst | 8 ++
airflow/providers/common/sql/hooks/sql.py | 21 +++-
airflow/providers/common/sql/provider.yaml | 1 +
.../providers/databricks/hooks/databricks_sql.py | 9 +-
.../databricks/operators/databricks_sql.py | 10 +-
airflow/providers/databricks/provider.yaml | 2 +-
airflow/providers/odbc/hooks/odbc.py | 13 ++-
airflow/providers/odbc/provider.yaml | 2 +-
dev/breeze/src/airflow_breeze/pre_commit_ids.py | 1 +
generated/provider_dependencies.json | 4 +-
images/breeze/output-commands-hash.txt | 75 +++++++++++++
images/breeze/output_static-checks.svg | 14 +--
images/breeze/output_static-checks.txt | 2 +-
.../pre_commit_check_common_sql_dependency.py | 124 +++++++++++++++++++++
.../databricks/hooks/test_databricks_sql.py | 21 ++--
.../databricks/operators/test_databricks_sql.py | 20 ----
tests/providers/odbc/hooks/test_odbc.py | 75 ++++++++++++-
19 files changed, 353 insertions(+), 57 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index a61a00b99c..606f56947e 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -378,6 +378,12 @@ repos:
files:
^dev/breeze/src/airflow_breeze/utils/docker_command_utils\.py$|^scripts/ci/docker_compose/local\.yml$
pass_filenames: false
additional_dependencies: ['rich>=12.4.4']
+ - id: check-common-sql-dependency-make-serializable
+ name: Check dependency of SQL Providers with '_make_serializable'
+ entry:
./scripts/ci/pre_commit/pre_commit_check_common_sql_dependency.py
+ language: python
+ files: ^airflow/providers/.*/hooks/.*\.py$
+ additional_dependencies: ['rich>=12.4.4', 'pyyaml', 'packaging']
- id: update-providers-dependencies
name: Update cross-dependencies for providers packages
entry:
./scripts/ci/pre_commit/pre_commit_update_providers_dependencies.py
diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst
index 7f81ff1267..da606c9b40 100644
--- a/STATIC_CODE_CHECKS.rst
+++ b/STATIC_CODE_CHECKS.rst
@@ -167,6 +167,8 @@ require Breeze Docker image to be built locally.
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-cncf-k8s-only-for-executors | Check
cncf.kubernetes imports used for executors only | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
+| check-common-sql-dependency-make-serializable | Check dependency
of SQL Providers with '_make_serializable' | |
++-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-core-deprecation-classes | Verify usage of
Airflow deprecation classes in core | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-daysago-import-from-utils | Make sure
days_ago is imported from airflow.utils.dates | |
diff --git a/airflow/providers/common/sql/CHANGELOG.rst
b/airflow/providers/common/sql/CHANGELOG.rst
index fdb9c9c394..dd73a9c51b 100644
--- a/airflow/providers/common/sql/CHANGELOG.rst
+++ b/airflow/providers/common/sql/CHANGELOG.rst
@@ -25,6 +25,14 @@
Changelog
---------
+1.8.1
+.....
+
+Misc
+~~~~
+
+* ``Add '_make_serializable' method which other SQL operators can overrides
when result from cursor is not JSON-serializable (#32319)``
+
1.8.0
.....
diff --git a/airflow/providers/common/sql/hooks/sql.py
b/airflow/providers/common/sql/hooks/sql.py
index 950ac7ee19..ab4eda5d8e 100644
--- a/airflow/providers/common/sql/hooks/sql.py
+++ b/airflow/providers/common/sql/hooks/sql.py
@@ -138,6 +138,11 @@ class DbApiHook(BaseForDbApiHook):
"""
Abstract base class for sql hooks.
+ When subclassing, maintainers can override the `_make_serializable` method:
+ This method transforms the result of the handler method (typically
`cursor.fetchall()`) into
+ JSON-serializable objects. Most of the time, the underlying SQL library
already returns tuples from
+ its cursor, and the `_make_serializable` method can be ignored.
+
:param schema: Optional DB schema that overrides the schema specified in
the connection. Make sure that
if you change the schema parameter value in the constructor of the
derived Hook, such change
should be done before calling the ``DBApiHook.__init__()``.
@@ -403,7 +408,7 @@ class DbApiHook(BaseForDbApiHook):
self._run_command(cur, sql_statement, parameters)
if handler is not None:
- result = handler(cur)
+ result = self._make_serializable(handler(cur))
if return_single_query_results(sql, return_last,
split_statements):
_last_result = result
_last_description = cur.description
@@ -423,6 +428,20 @@ class DbApiHook(BaseForDbApiHook):
else:
return results
+ @staticmethod
+ def _make_serializable(result: Any) -> Any:
+ """Ensure the data returned from an SQL command is JSON-serializable.
+
+ This method is intended to be overridden by subclasses of the
`DbApiHook`. Its purpose is to
+ transform the result of an SQL command (typically returned by cursor
methods) into a
+ JSON-serializable format.
+
+ If this method is not overridden, the result data is returned as-is.
+ If the output of the cursor is already JSON-serializable, this method
+ should be ignored.
+ """
+ return result
+
def _run_command(self, cur, sql_statement, parameters):
"""Run a statement using an already open cursor."""
if self.log_sql:
diff --git a/airflow/providers/common/sql/provider.yaml
b/airflow/providers/common/sql/provider.yaml
index 4cb140dc81..4dc1e70326 100644
--- a/airflow/providers/common/sql/provider.yaml
+++ b/airflow/providers/common/sql/provider.yaml
@@ -23,6 +23,7 @@ description: |
suspended: false
versions:
+ - 1.8.1
- 1.8.0
- 1.7.2
- 1.7.1
diff --git a/airflow/providers/databricks/hooks/databricks_sql.py
b/airflow/providers/databricks/hooks/databricks_sql.py
index c0bf5f1cad..d61d9f1bd7 100644
--- a/airflow/providers/databricks/hooks/databricks_sql.py
+++ b/airflow/providers/databricks/hooks/databricks_sql.py
@@ -222,7 +222,7 @@ class DatabricksSqlHook(BaseDatabricksHook, DbApiHook):
with closing(conn.cursor()) as cur:
self._run_command(cur, sql_statement, parameters)
if handler is not None:
- result = handler(cur)
+ result = self._make_serializable(handler(cur))
if return_single_query_results(sql, return_last,
split_statements):
results = [result]
self.descriptions = [cur.description]
@@ -240,6 +240,13 @@ class DatabricksSqlHook(BaseDatabricksHook, DbApiHook):
else:
return results
+ @staticmethod
+ def _make_serializable(result):
+ """Transform the databricks Row objects into a JSON-serializable list
of rows."""
+ if result is not None:
+ return [list(row) for row in result]
+ return result
+
def bulk_dump(self, table, tmp_file):
raise NotImplementedError()
diff --git a/airflow/providers/databricks/operators/databricks_sql.py
b/airflow/providers/databricks/operators/databricks_sql.py
index a9354000b0..a03cfa729c 100644
--- a/airflow/providers/databricks/operators/databricks_sql.py
+++ b/airflow/providers/databricks/operators/databricks_sql.py
@@ -30,15 +30,9 @@ from airflow.providers.common.sql.operators.sql import
SQLExecuteQueryOperator
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
if TYPE_CHECKING:
- from databricks.sql.types import Row
-
from airflow.utils.context import Context
-def make_serializable(val: Row):
- return tuple(val)
-
-
class DatabricksSqlOperator(SQLExecuteQueryOperator):
"""
Executes SQL code in a Databricks SQL endpoint or a Databricks cluster.
@@ -129,7 +123,7 @@ class DatabricksSqlOperator(SQLExecuteQueryOperator):
def _process_output(self, results: list[Any], descriptions:
list[Sequence[Sequence] | None]) -> list[Any]:
if not self._output_path:
- return list(zip(descriptions, [[make_serializable(row) for row in
res] for res in results]))
+ return list(zip(descriptions, results))
if not self._output_format:
raise AirflowException("Output format should be specified!")
# Output to a file only the result of last query
@@ -162,7 +156,7 @@ class DatabricksSqlOperator(SQLExecuteQueryOperator):
file.write("\n")
else:
raise AirflowException(f"Unsupported output format:
'{self._output_format}'")
- return list(zip(descriptions, [[make_serializable(row) for row in res]
for res in results]))
+ return list(zip(descriptions, results))
COPY_INTO_APPROVED_FORMATS = ["CSV", "JSON", "AVRO", "ORC", "PARQUET", "TEXT",
"BINARYFILE"]
diff --git a/airflow/providers/databricks/provider.yaml
b/airflow/providers/databricks/provider.yaml
index cefb777bec..2890561e38 100644
--- a/airflow/providers/databricks/provider.yaml
+++ b/airflow/providers/databricks/provider.yaml
@@ -56,7 +56,7 @@ versions:
dependencies:
- apache-airflow>=2.5.0
- - apache-airflow-providers-common-sql>=1.5.0
+ - apache-airflow-providers-common-sql>=1.8.1
- requests>=2.27,<3
# The connector 2.9.0 released on Aug 10, 2023 has a bug that it does not
properly declare urllib3 and
# it needs to be excluded. See
https://github.com/databricks/databricks-sql-python/issues/190
diff --git a/airflow/providers/odbc/hooks/odbc.py
b/airflow/providers/odbc/hooks/odbc.py
index 21678ebe55..8242aa5247 100644
--- a/airflow/providers/odbc/hooks/odbc.py
+++ b/airflow/providers/odbc/hooks/odbc.py
@@ -17,7 +17,7 @@
"""This module contains ODBC hook."""
from __future__ import annotations
-from typing import Any
+from typing import Any, NamedTuple
from urllib.parse import quote_plus
import pyodbc
@@ -211,3 +211,14 @@ class OdbcHook(DbApiHook):
engine = self.get_sqlalchemy_engine(engine_kwargs=engine_kwargs)
cnx = engine.connect(**(connect_kwargs or {}))
return cnx
+
+ @staticmethod
+ def _make_serializable(result: list[pyodbc.Row] | None) ->
list[NamedTuple] | None:
+ """Transform the pyodbc.Row objects returned from an SQL command into
JSON-serializable NamedTuple."""
+ if result is not None:
+ columns: list[tuple[str, type]] = [col[:2] for col in
result[0].cursor_description]
+ # Below line respects NamedTuple docstring, but mypy do not
support dynamically
+ # instantiated Namedtuple, and will never do:
https://github.com/python/mypy/issues/848
+ row_object = NamedTuple("Row", columns) # type: ignore[misc]
+ return [row_object(*row) for row in result]
+ return result
diff --git a/airflow/providers/odbc/provider.yaml
b/airflow/providers/odbc/provider.yaml
index a348fdc019..513d089b22 100644
--- a/airflow/providers/odbc/provider.yaml
+++ b/airflow/providers/odbc/provider.yaml
@@ -42,7 +42,7 @@ versions:
dependencies:
- apache-airflow>=2.5.0
- - apache-airflow-providers-common-sql>=1.3.1
+ - apache-airflow-providers-common-sql>=1.8.1
- pyodbc
integrations:
diff --git a/dev/breeze/src/airflow_breeze/pre_commit_ids.py
b/dev/breeze/src/airflow_breeze/pre_commit_ids.py
index 35dd8883e3..8c20452835 100644
--- a/dev/breeze/src/airflow_breeze/pre_commit_ids.py
+++ b/dev/breeze/src/airflow_breeze/pre_commit_ids.py
@@ -37,6 +37,7 @@ PRE_COMMIT_LIST = [
"check-builtin-literals",
"check-changelog-has-no-duplicates",
"check-cncf-k8s-only-for-executors",
+ "check-common-sql-dependency-make-serializable",
"check-core-deprecation-classes",
"check-daysago-import-from-utils",
"check-decorated-operator-implements-custom-name",
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index a4db9305fc..9027a156b7 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -308,7 +308,7 @@
"databricks": {
"deps": [
"aiohttp>=3.6.3, <4",
- "apache-airflow-providers-common-sql>=1.5.0",
+ "apache-airflow-providers-common-sql>=1.8.1",
"apache-airflow>=2.5.0",
"databricks-sql-connector>=2.0.0, <3.0.0, !=2.9.0",
"requests>=2.27,<3"
@@ -659,7 +659,7 @@
},
"odbc": {
"deps": [
- "apache-airflow-providers-common-sql>=1.3.1",
+ "apache-airflow-providers-common-sql>=1.8.1",
"apache-airflow>=2.5.0",
"pyodbc"
],
diff --git a/images/breeze/output-commands-hash.txt
b/images/breeze/output-commands-hash.txt
new file mode 100644
index 0000000000..5d1f0642ef
--- /dev/null
+++ b/images/breeze/output-commands-hash.txt
@@ -0,0 +1,75 @@
+# This file is automatically generated by pre-commit. If you have a conflict
with this file
+# Please do not solve it but run `breeze setup regenerate-command-images`.
+# This command should fix the conflict and regenerate help images that you
have conflict with.
+main:ffb1a766b791beaf5f8a983587db870f
+build-docs:2e9882744f219e56726548ce2d13c3f5
+ci:find-backtracking-candidates:17fe56b867a745e5032a08dfcd3f73ee
+ci:fix-ownership:3e5a73533cc96045e72cb258783cfc96
+ci:free-space:49af17b032039c05c41a7a8283f365cc
+ci:get-workflow-info:8246038093359b9c3c110043419473e2
+ci:resource-check:bfcca92f18a403ca630955074eb5e9ad
+ci:selective-check:6657ed5d42affb7264b5efcc86f17a2a
+ci:5315c29bd9f68725ef92e4db8aff5cda
+ci-image:build:dd891a7e3c99131f7166ebbccd4f670b
+ci-image:pull:f9248c6026da61fe0acdb5d8f37b20da
+ci-image:verify:c90dc7e20fce2351eb89d8d1ebbd35e7
+ci-image:973b722fdd947e21ff59f3bf9cfc6264
+cleanup:8d92d453a6700f6d8cb11fb6a8b50461
+compile-www-assets:0963f1409f0aa1e3b137cddd4cc52e87
+down:4580f5b3b178ea00182694f134a751f3
+exec:9d0fb86607526afb6b161115ae7bf9cc
+k8s:build-k8s-image:b625255c3e8f3f794ee404f9a4476836
+k8s:configure-cluster:9958c5aac726565ec043e850d56ec8f8
+k8s:create-cluster:3e43f9da5e7c0bb67f3d868c9005515a
+k8s:delete-cluster:5f580bb09b6456610bf1044321717673
+k8s:deploy-airflow:f4b05b2101a4a029c9706ecd6fbf3c5c
+k8s:k9s:892a7931e981ba01a21c0da72fac39bc
+k8s:logs:f1a3fa2c5747d86ff712d1b0a06ff48b
+k8s:run-complete-tests:5018013f47f6c60aae07eb35256eb240
+k8s:setup-env:a34e94744ca4e0592371fe55478c3d54
+k8s:shell:b872c01cedfd50b865d98ed85933fed7
+k8s:status:6e711c24648c9bf42372e5b73cb2ac0f
+k8s:tests:4fea1fee4cfbf15f313ffd9026219401
+k8s:upload-k8s-image:46c5f1b042222047fda3f18f1ef75835
+k8s:6994fe347c18bcc01d95fb721a3757d5
+prod-image:build:20f84ddadc2fe4ae2723b7ccdde0197f
+prod-image:pull:3817ef211b023b76df84ee1110ef64dd
+prod-image:verify:bd2b78738a7c388dbad6076c41a9f906
+prod-image:e9ecd759e51ebd926df3170b29d1d2dc
+release-management:add-back-references:51960e2831d0e03a2b127d252929b843
+release-management:create-minor-branch:a3834afc4aa5d1e98002c9e9e7a9931d
+release-management:generate-constraints:01aef235b11e59ed7f10c970a5cdaba7
+release-management:generate-issue-content-providers:cda108e7f2506c2816af8f2a6c24070c
+release-management:generate-providers-metadata:d4e8e5cfaa024e3963af02d7a873048d
+release-management:install-provider-packages:34c38aca17d23dbb454fe7a6bfd8e630
+release-management:prepare-airflow-package:85d01c57e5b5ee0fb9e5f9d9706ed3b5
+release-management:prepare-provider-documentation:eb861d68b8d72cd98dc8732fc5393796
+release-management:prepare-provider-packages:908e2c826f7b4959dfd8bc693f3857a7
+release-management:publish-docs:51ee9bf1268529513996a14bd5350c19
+release-management:release-prod-images:cfbfe8b19fee91fd90718f98ef2fd078
+release-management:start-rc-process:b27bd524dd3c89f50a747b60a7e892c1
+release-management:start-release:419f48f6a4ff4457cb9de7ff496aebbe
+release-management:update-constraints:02ec4b119150e3fdbac52026e94820ef
+release-management:verify-provider-packages:96dce5644aad6b37080acf77b3d8de3a
+release-management:59d956e45fccf55e47f16e33cfc5d04a
+sbom:build-all-airflow-images:32f8acade299c2b112e986bae99846db
+sbom:generate-providers-requirements:3926848718283cf2ef00310a0892e867
+sbom:update-sbom-information:653be48be70b4b7ff5172d491aadc694
+sbom:386048e0c00c0de30cf181eb9f3862ea
+setup:autocomplete:fffcd49e102e09ccd69b3841a9e3ea8e
+setup:check-all-params-in-groups:f9ca6bef11ed65e40f06d7cf261a4859
+setup:config:53a0aeec6237da4d46bde68fafa29dc3
+setup:regenerate-command-images:ea2fba3440bd4e84311a53abe6e8dc56
+setup:self-upgrade:4af905a147fcd6670a0e33d3d369a94b
+setup:version:be116d90a21c2afe01087f7609774e1e
+setup:304a70e939d78427c749c24e8c0992df
+shell:aa92fe60473e4b5d0f41b5b182c02468
+start-airflow:f09871892c61bc889e6b56791115c923
+static-checks:f39d698d5735f372c01d9f1d5719fd13
+testing:db-tests:e08e3f30ddc34d95ae56de5222392b59
+testing:docker-compose-tests:fd154a058082fcfda12eb877a9a89338
+testing:helm-tests:98a9ba6631249762b1633b76a29f4461
+testing:integration-tests:c7fde5144126a445201d7e353aa19ba5
+testing:non-db-tests:ed916603036dd9979b1593c4d088eb40
+testing:tests:4ad1723c7b2b6d2d7d249d42964ced92
+testing:eae1e62ff40d5012388abd104461b88e
diff --git a/images/breeze/output_static-checks.svg
b/images/breeze/output_static-checks.svg
index 650f6755a7..0aa76ae301 100644
--- a/images/breeze/output_static-checks.svg
+++ b/images/breeze/output_static-checks.svg
@@ -301,13 +301,13 @@
</text><text class="breeze-static-checks-r5" x="0" y="264" textLength="12.2"
clip-path="url(#breeze-static-checks-line-10)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="264" textLength="988.2"
clip-path="url(#breeze-static-checks-line-10)">check-base-operator-usage | check-boring-cyborg-configuration |                  </text><text
class="breeze-static-checks-r5" x="1451 [...]
</text><text class="breeze-static-checks-r5" x="0" y="288.4" textLength="12.2"
clip-path="url(#breeze-static-checks-line-11)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="288.4" textLength="988.2"
clip-path="url(#breeze-static-checks-line-11)">check-breeze-top-dependencies-limited | check-builtin-literals |                 </text><text
class="breeze-static-checks-r5" x="1451. [...]
</text><text class="breeze-static-checks-r5" x="0" y="312.8" textLength="12.2"
clip-path="url(#breeze-static-checks-line-12)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="312.8" textLength="988.2"
clip-path="url(#breeze-static-checks-line-12)">check-changelog-has-no-duplicates | check-cncf-k8s-only-for-executors |          </text><text
class="breeze-static-checks-r5" x="1451.8" y="312.8" textLength="12.2" clip
[...]
-</text><text class="breeze-static-checks-r5" x="0" y="337.2" textLength="12.2"
clip-path="url(#breeze-static-checks-line-13)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="337.2" textLength="988.2"
clip-path="url(#breeze-static-checks-line-13)">check-core-deprecation-classes | check-daysago-import-from-utils |               </text><text
class="breeze-static-checks-r5" x="1451.8" y="337. [...]
-</text><text class="breeze-static-checks-r5" x="0" y="361.6" textLength="12.2"
clip-path="url(#breeze-static-checks-line-14)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="361.6" textLength="988.2"
clip-path="url(#breeze-static-checks-line-14)">check-decorated-operator-implements-custom-name | check-deferrable-default-value </text><text
class="breeze-static-checks-r5" x="1451.8" y="361.6" textLength="12.2"
clip-path="url(#breeze-static-checks-line-14)">│</text [...]
-</text><text class="breeze-static-checks-r5" x="0" y="386" textLength="12.2"
clip-path="url(#breeze-static-checks-line-15)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="386" textLength="988.2"
clip-path="url(#breeze-static-checks-line-15)">| check-docstring-param-types | check-example-dags-urls |                        </text><text
clas [...]
-</text><text class="breeze-static-checks-r5" x="0" y="410.4" textLength="12.2"
clip-path="url(#breeze-static-checks-line-16)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="410.4" textLength="988.2"
clip-path="url(#breeze-static-checks-line-16)">check-executables-have-shebangs | check-extra-packages-references |              </text><text
class="breeze-static-checks-r5" x="1451.8" y="410.4" te [...]
-</text><text class="breeze-static-checks-r5" x="0" y="434.8" textLength="12.2"
clip-path="url(#breeze-static-checks-line-17)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="434.8" textLength="988.2"
clip-path="url(#breeze-static-checks-line-17)">check-extras-order | check-for-inclusive-language |                            
[...]
-</text><text class="breeze-static-checks-r5" x="0" y="459.2" textLength="12.2"
clip-path="url(#breeze-static-checks-line-18)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="459.2" textLength="988.2"
clip-path="url(#breeze-static-checks-line-18)">check-google-re2-as-dependency | check-hooks-apply |                            
[...]
-</text><text class="breeze-static-checks-r5" x="0" y="483.6" textLength="12.2"
clip-path="url(#breeze-static-checks-line-19)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="483.6" textLength="988.2"
clip-path="url(#breeze-static-checks-line-19)">check-incorrect-use-of-LoggingMixin | check-init-decorator-arguments |           </text><text
class="breeze-static-checks-r5" x="1451.8" y="483.6" textLength="12.2" [...]
+</text><text class="breeze-static-checks-r5" x="0" y="337.2" textLength="12.2"
clip-path="url(#breeze-static-checks-line-13)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="337.2" textLength="988.2"
clip-path="url(#breeze-static-checks-line-13)">check-common-sql-dependency-make-serializable | check-core-deprecation-classes | </text><text
class="breeze-static-checks-r5" x="1451.8" y="337.2" textLength="12.2"
clip-path="url(#breeze-static-checks-line-13)">│< [...]
+</text><text class="breeze-static-checks-r5" x="0" y="361.6" textLength="12.2"
clip-path="url(#breeze-static-checks-line-14)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="361.6" textLength="988.2"
clip-path="url(#breeze-static-checks-line-14)">check-daysago-import-from-utils | check-decorated-operator-implements-custom-name</text><text
class="breeze-static-checks-r5" x="1451.8" y="361.6" textLength="12.2"
clip-path="url(#breeze-static-checks-line-14)">│</text><tex [...]
+</text><text class="breeze-static-checks-r5" x="0" y="386" textLength="12.2"
clip-path="url(#breeze-static-checks-line-15)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="386" textLength="988.2"
clip-path="url(#breeze-static-checks-line-15)">| check-deferrable-default-value | check-docstring-param-types |                 </text><text
class="breeze-static-checks-r5" x="1451 [...]
+</text><text class="breeze-static-checks-r5" x="0" y="410.4" textLength="12.2"
clip-path="url(#breeze-static-checks-line-16)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="410.4" textLength="988.2"
clip-path="url(#breeze-static-checks-line-16)">check-example-dags-urls | check-executables-have-shebangs |                      </text><text
class="breeze-s [...]
+</text><text class="breeze-static-checks-r5" x="0" y="434.8" textLength="12.2"
clip-path="url(#breeze-static-checks-line-17)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="434.8" textLength="988.2"
clip-path="url(#breeze-static-checks-line-17)">check-extra-packages-references | check-extras-order |                           </t
[...]
+</text><text class="breeze-static-checks-r5" x="0" y="459.2" textLength="12.2"
clip-path="url(#breeze-static-checks-line-18)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="459.2" textLength="988.2"
clip-path="url(#breeze-static-checks-line-18)">check-for-inclusive-language | check-google-re2-as-dependency | check-hooks-apply</text><text
class="breeze-static-checks-r5" x="1451.8" y="459.2" textLength="12.2"
clip-path="url(#breeze-static-checks-line-18)">│< [...]
+</text><text class="breeze-static-checks-r5" x="0" y="483.6" textLength="12.2"
clip-path="url(#breeze-static-checks-line-19)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="483.6" textLength="988.2"
clip-path="url(#breeze-static-checks-line-19)">| check-incorrect-use-of-LoggingMixin | check-init-decorator-arguments |         </text><text
class="breeze-static-checks-r5" x="1451.8" y="483.6" textLength="12.2" clip
[...]
</text><text class="breeze-static-checks-r5" x="0" y="508" textLength="12.2"
clip-path="url(#breeze-static-checks-line-20)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="508" textLength="988.2"
clip-path="url(#breeze-static-checks-line-20)">check-lazy-logging | check-links-to-example-dags-do-not-use-hardcoded-versions | </text><text
class="breeze-static-checks-r5" x="1451.8" y="508" textLength="12.2"
clip-path="url(#breeze-static-checks-line-20)">│</text> [...]
</text><text class="breeze-static-checks-r5" x="0" y="532.4" textLength="12.2"
clip-path="url(#breeze-static-checks-line-21)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="532.4" textLength="988.2"
clip-path="url(#breeze-static-checks-line-21)">check-merge-conflict | check-newsfragments-are-valid |                           </t
[...]
</text><text class="breeze-static-checks-r5" x="0" y="556.8" textLength="12.2"
clip-path="url(#breeze-static-checks-line-22)">│</text><text
class="breeze-static-checks-r7" x="451.4" y="556.8" textLength="988.2"
clip-path="url(#breeze-static-checks-line-22)">check-no-airflow-deprecation-in-providers | check-no-providers-in-core-examples |</text><text
class="breeze-static-checks-r5" x="1451.8" y="556.8" textLength="12.2"
clip-path="url(#breeze-static-checks-line-22)">│</text [...]
diff --git a/images/breeze/output_static-checks.txt
b/images/breeze/output_static-checks.txt
index 2a032d6902..6580d8e471 100644
--- a/images/breeze/output_static-checks.txt
+++ b/images/breeze/output_static-checks.txt
@@ -1 +1 @@
-019b0680948fa7d9e2d4ad23397c8927
+c48be91ea9fc564deae1dd6f0e575cd8
diff --git a/scripts/ci/pre_commit/pre_commit_check_common_sql_dependency.py
b/scripts/ci/pre_commit/pre_commit_check_common_sql_dependency.py
new file mode 100755
index 0000000000..4b335d1cf6
--- /dev/null
+++ b/scripts/ci/pre_commit/pre_commit_check_common_sql_dependency.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+import pathlib
+import sys
+from typing import Iterable
+
+import yaml
+from packaging.specifiers import SpecifierSet
+from rich.console import Console
+
+console = Console(color_system="standard", width=200)
+
+
+COMMON_SQL_PROVIDER_NAME: str = "apache-airflow-providers-common-sql"
+COMMON_SQL_PROVIDER_MIN_COMPATIBLE_VERSIONS: str = "1.8.1"
+COMMON_SQL_PROVIDER_LATEST_INCOMPATIBLE_VERSION: str = "1.8.0"
+MAKE_SERIALIZABLE_METHOD_NAME: str = "_make_serializable"
+
+
+def get_classes(file_path: str) -> Iterable[ast.ClassDef]:
+ """Return a list of class declared in the given python file."""
+ pathlib_path = pathlib.Path(file_path)
+ module = ast.parse(pathlib_path.read_text("utf-8"), str(pathlib_path))
+ for node in ast.walk(module):
+ if isinstance(node, ast.ClassDef):
+ yield node
+
+
+def is_subclass_of_dbapihook(node: ast.ClassDef) -> bool:
+ """Return the subclass's name of a given class definition."""
+ for base in node.bases:
+ if isinstance(base, ast.Name) and base.id == "DbApiHook":
+ return True
+ return False
+
+
+def has_make_serializable_method(node: ast.ClassDef) -> bool:
+ """Return True if the given class implements `_make_serializable`
method."""
+ for body_element in node.body:
+ if isinstance(body_element, ast.FunctionDef) and (body_element.name ==
MAKE_SERIALIZABLE_METHOD_NAME):
+ return True
+ return False
+
+
+def determine_provider_yaml_path(file_path: str) -> str:
+ """Determine the path of the provider.yaml file related to the given
python file."""
+ return f"{file_path.split('/hooks')[0]}/provider.yaml"
+
+
+def get_yaml_content(file_path: str) -> dict:
+ """Load content of a yaml files."""
+ with open(file_path) as file:
+ return yaml.safe_load(file)
+
+
+def get_common_sql_constraints(provider_metadata: dict) -> str | None:
+ """Return the version constraints of
`apache-airflow-providers-common-sql`."""
+ dependencies: list[str] = provider_metadata["dependencies"]
+ for dependency in dependencies:
+ if dependency.startswith(COMMON_SQL_PROVIDER_NAME):
+ return dependency[len(COMMON_SQL_PROVIDER_NAME) :]
+ return None
+
+
+def do_version_satisfies_constraints(
+ version: str,
+ max_incompatible_version=COMMON_SQL_PROVIDER_LATEST_INCOMPATIBLE_VERSION,
+) -> bool:
+ """Check if the `version_string` is constrained to at least >= 1.8.1."""
+ constraints: list[str] = [constraint.strip() for constraint in
version.split(",")]
+ specifier_set = SpecifierSet(",".join(constraints))
+ return not specifier_set.contains(max_incompatible_version)
+
+
+def check_sql_providers_dependency():
+ error_count: int = 0
+ for path in sys.argv[1:]:
+ if not path.startswith("airflow/providers/"):
+ continue
+
+ for clazz in get_classes(path):
+ if is_subclass_of_dbapihook(node=clazz) and
has_make_serializable_method(node=clazz):
+ provider_yaml_path: str =
determine_provider_yaml_path(file_path=path)
+ provider_metadata: dict =
get_yaml_content(file_path=provider_yaml_path)
+
+ if version_constraint :=
get_common_sql_constraints(provider_metadata=provider_metadata):
+ if not
do_version_satisfies_constraints(version=version_constraint):
+ error_count += 1
+ console.print(
+ f"\n[yellow]Provider {provider_metadata['name']}
must have "
+
f"'{COMMON_SQL_PROVIDER_NAME}>={COMMON_SQL_PROVIDER_MIN_COMPATIBLE_VERSIONS}'
as "
+ f"dependency, because `{clazz.name}` overrides the
"
+ f"`{MAKE_SERIALIZABLE_METHOD_NAME}` method."
+ )
+ if error_count:
+ console.print(
+ f"The `{MAKE_SERIALIZABLE_METHOD_NAME}` method was introduced in
{COMMON_SQL_PROVIDER_NAME} "
+ f"{COMMON_SQL_PROVIDER_MIN_COMPATIBLE_VERSIONS}. You cannot rely
on an older version of this "
+ "provider to override this method."
+ )
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ sys.exit(check_sql_providers_dependency())
diff --git a/tests/providers/databricks/hooks/test_databricks_sql.py
b/tests/providers/databricks/hooks/test_databricks_sql.py
index 088f5e54a9..1be035c443 100644
--- a/tests/providers/databricks/hooks/test_databricks_sql.py
+++ b/tests/providers/databricks/hooks/test_databricks_sql.py
@@ -22,6 +22,7 @@ from unittest import mock
from unittest.mock import patch
import pytest
+from databricks.sql.types import Row
from airflow.models import Connection
from airflow.providers.common.sql.hooks.sql import fetch_all_handler
@@ -67,7 +68,7 @@ def get_cursor_descriptions(fields: list[str]) ->
list[tuple[str]]:
"select * from test.test",
["select * from test.test"],
[["id", "value"]],
- ([[1, 2], [11, 12]],),
+ ([Row(id=1, value=2), Row(id=11, value=12)],),
[[("id",), ("value",)]],
[[1, 2], [11, 12]],
id="The return_last set and no split statements set on single
query in string",
@@ -78,7 +79,7 @@ def get_cursor_descriptions(fields: list[str]) ->
list[tuple[str]]:
"select * from test.test;",
["select * from test.test"],
[["id", "value"]],
- ([[1, 2], [11, 12]],),
+ ([Row(id=1, value=2), Row(id=11, value=12)],),
[[("id",), ("value",)]],
[[1, 2], [11, 12]],
id="The return_last not set and no split statements set on single
query in string",
@@ -89,7 +90,7 @@ def get_cursor_descriptions(fields: list[str]) ->
list[tuple[str]]:
"select * from test.test;",
["select * from test.test"],
[["id", "value"]],
- ([[1, 2], [11, 12]],),
+ ([Row(id=1, value=2), Row(id=11, value=12)],),
[[("id",), ("value",)]],
[[1, 2], [11, 12]],
id="The return_last set and split statements set on single query
in string",
@@ -100,7 +101,7 @@ def get_cursor_descriptions(fields: list[str]) ->
list[tuple[str]]:
"select * from test.test;",
["select * from test.test"],
[["id", "value"]],
- ([[1, 2], [11, 12]],),
+ ([Row(id=1, value=2), Row(id=11, value=12)],),
[[("id",), ("value",)]],
[[[1, 2], [11, 12]]],
id="The return_last not set and split statements set on single
query in string",
@@ -111,7 +112,7 @@ def get_cursor_descriptions(fields: list[str]) ->
list[tuple[str]]:
"select * from test.test;select * from test.test2;",
["select * from test.test", "select * from test.test2"],
[["id", "value"], ["id2", "value2"]],
- ([[1, 2], [11, 12]], [[3, 4], [13, 14]]),
+ ([Row(id=1, value=2), Row(id=11, value=12)], [Row(id=3, value=4),
Row(id=13, value=14)]),
[[("id2",), ("value2",)]],
[[3, 4], [13, 14]],
id="The return_last set and split statements set on multiple
queries in string",
@@ -122,7 +123,7 @@ def get_cursor_descriptions(fields: list[str]) ->
list[tuple[str]]:
"select * from test.test;select * from test.test2;",
["select * from test.test", "select * from test.test2"],
[["id", "value"], ["id2", "value2"]],
- ([[1, 2], [11, 12]], [[3, 4], [13, 14]]),
+ ([Row(id=1, value=2), Row(id=11, value=12)], [Row(id=3, value=4),
Row(id=13, value=14)]),
[[("id",), ("value",)], [("id2",), ("value2",)]],
[[[1, 2], [11, 12]], [[3, 4], [13, 14]]],
id="The return_last not set and split statements set on multiple
queries in string",
@@ -133,7 +134,7 @@ def get_cursor_descriptions(fields: list[str]) ->
list[tuple[str]]:
["select * from test.test;"],
["select * from test.test"],
[["id", "value"]],
- ([[1, 2], [11, 12]],),
+ ([Row(id=1, value=2), Row(id=11, value=12)],),
[[("id",), ("value",)]],
[[[1, 2], [11, 12]]],
id="The return_last set on single query in list",
@@ -144,7 +145,7 @@ def get_cursor_descriptions(fields: list[str]) ->
list[tuple[str]]:
["select * from test.test;"],
["select * from test.test"],
[["id", "value"]],
- ([[1, 2], [11, 12]],),
+ ([Row(id=1, value=2), Row(id=11, value=12)],),
[[("id",), ("value",)]],
[[[1, 2], [11, 12]]],
id="The return_last not set on single query in list",
@@ -155,7 +156,7 @@ def get_cursor_descriptions(fields: list[str]) ->
list[tuple[str]]:
"select * from test.test;select * from test.test2;",
["select * from test.test", "select * from test.test2"],
[["id", "value"], ["id2", "value2"]],
- ([[1, 2], [11, 12]], [[3, 4], [13, 14]]),
+ ([Row(id=1, value=2), Row(id=11, value=12)], [Row(id=3, value=4),
Row(id=13, value=14)]),
[[("id2",), ("value2",)]],
[[3, 4], [13, 14]],
id="The return_last set on multiple queries in list",
@@ -166,7 +167,7 @@ def get_cursor_descriptions(fields: list[str]) ->
list[tuple[str]]:
"select * from test.test;select * from test.test2;",
["select * from test.test", "select * from test.test2"],
[["id", "value"], ["id2", "value2"]],
- ([[1, 2], [11, 12]], [[3, 4], [13, 14]]),
+ ([Row(id=1, value=2), Row(id=11, value=12)], [Row(id=3, value=4),
Row(id=13, value=14)]),
[[("id",), ("value",)], [("id2",), ("value2",)]],
[[[1, 2], [11, 12]], [[3, 4], [13, 14]]],
id="The return_last not set on multiple queries not set",
diff --git a/tests/providers/databricks/operators/test_databricks_sql.py
b/tests/providers/databricks/operators/test_databricks_sql.py
index e247e4deca..e7885740cf 100644
--- a/tests/providers/databricks/operators/test_databricks_sql.py
+++ b/tests/providers/databricks/operators/test_databricks_sql.py
@@ -25,7 +25,6 @@ from databricks.sql.types import Row
from airflow.providers.common.sql.hooks.sql import fetch_all_handler
from airflow.providers.databricks.operators.databricks_sql import
DatabricksSqlOperator
-from airflow.serialization.serde import serialize
DATE = "2017-04-20"
TASK_ID = "databricks-sql-operator"
@@ -151,25 +150,6 @@ def test_exec_success(sql, return_last, split_statement,
hook_results, hook_desc
)
-def test_return_value_serialization():
- hook_descriptions = [[("id",), ("value",)]]
- hook_results = [Row(id=1, value="value1"), Row(id=2, value="value2")]
-
- with
patch("airflow.providers.databricks.operators.databricks_sql.DatabricksSqlHook")
as db_mock_class:
- op = DatabricksSqlOperator(
- task_id=TASK_ID,
- sql="select * from dummy2",
- do_xcom_push=True,
- return_last=True,
- )
- db_mock = db_mock_class.return_value
- db_mock.run.return_value = hook_results
- db_mock.descriptions = hook_descriptions
- result = op.execute({})
- serialized_result = serialize(result)
- assert serialized_result == serialize(([("id",), ("value",)], [(1,
"value1"), (2, "value2")]))
-
-
@pytest.mark.parametrize(
"return_last, split_statements, sql, descriptions, hook_results,
do_xcom_push",
[
diff --git a/tests/providers/odbc/hooks/test_odbc.py
b/tests/providers/odbc/hooks/test_odbc.py
index a7ea20a77d..ad763b934b 100644
--- a/tests/providers/odbc/hooks/test_odbc.py
+++ b/tests/providers/odbc/hooks/test_odbc.py
@@ -19,16 +19,46 @@ from __future__ import annotations
import json
import logging
+from dataclasses import dataclass
from unittest import mock
from unittest.mock import patch
from urllib.parse import quote_plus, urlsplit
import pyodbc
+import pytest
from airflow.models import Connection
from airflow.providers.odbc.hooks.odbc import OdbcHook
[email protected]
+def mock_row():
+ """
+ Mock a pyodbc.Row object - This is a C object that can only be created
from C API of pyodbc.
+ This mock implements the two features used by the hook:
+ - cursor_description: which return column names and type
+ - __iter__: which allows exploding a row instance (*row)
+ """
+
+ @dataclass
+ class Row:
+ key: int
+ column: str
+
+ def __iter__(self):
+ yield self.key
+ yield self.column
+
+ @property
+ def cursor_description(self):
+ return [
+ ("key", int, None, 11, 11, 0, None),
+ ("column", str, None, 256, 256, 0, None),
+ ]
+
+ return Row
+
+
class TestOdbcHook:
def get_hook(self=None, hook_params=None, conn_params=None):
hook_params = hook_params or {}
@@ -40,10 +70,22 @@ class TestOdbcHook:
}
)
- hook = OdbcHook(**hook_params)
- hook.get_connection = mock.Mock()
- hook.get_connection.return_value = connection
- return hook
+ cursor = mock.MagicMock(
+ rowcount=0, spec=["description", "rowcount", "execute",
"fetchall", "fetchone", "close"]
+ )
+ conn = mock.MagicMock()
+ conn.cursor.return_value = cursor
+
+ class UnitTestOdbcHook(OdbcHook):
+ conn_name_attr = "test_conn_id"
+
+ def get_connection(self, conn_id: str):
+ return connection
+
+ def get_conn(self):
+ return conn
+
+ return UnitTestOdbcHook(**hook_params)
def test_driver_in_extra_not_used(self):
conn_params = dict(extra=json.dumps(dict(Driver="Fake Driver",
Fake_Param="Fake Param")))
@@ -235,3 +277,28 @@ class TestOdbcHook:
hook =
self.get_hook(conn_params=dict(extra=json.dumps(dict(sqlalchemy_scheme="my-scheme"))))
uri = hook.get_uri()
assert urlsplit(uri).scheme == "my-scheme"
+
+ def test_pyodbc_mock(self):
+ """Ensure that pyodbc.Row object has a `cursor_description` method.
+
+ In subsequent tests, pyodbc.Row is replaced by pure Python mock
object, which implements the above
+ method. We want to detect any breaking change in the pyodbc object. If
it fails, the 'mock_row'
+ needs to be updated.
+ """
+ assert hasattr(pyodbc.Row, "cursor_description")
+
+ def test_query_return_serializable_result(self, mock_row):
+ pyodbc_result = [mock_row(key=1, column="value1"), mock_row(key=2,
column="value2")]
+ hook_result = [(1, "value1"), (2, "value2")]
+
+ def mock_handler(*_):
+ return pyodbc_result
+
+ hook = self.get_hook()
+ result = hook.run("SQL", handler=mock_handler)
+ assert hook_result == result
+
+ def test_query_no_handler_return_none(self):
+ hook = self.get_hook()
+ result = hook.run("SQL")
+ assert result is None