This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a343bba1e3 Fix templating fields and do_xcom_push in
DatabricksSQLOperator (#27868)
a343bba1e3 is described below
commit a343bba1e39a1b28c469974fc87eb106c9f67db8
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Nov 23 23:59:19 2022 +0100
Fix templating fields and do_xcom_push in DatabricksSQLOperator (#27868)
When SQLExecuteQueryOperator has been introduced in #25717, it
introduced some errors in the Databricks SQL operator:
* The templated "schema" field has not been set as field in the
operator.
* The do_xcom_push parameter was ignored
This PR fixes it by:
* storing schema as field and using it via self reference
* do_xcom_push is removed (and BaseOperator's one is used).
---
.../databricks/operators/databricks_sql.py | 36 ++++++++++++----------
1 file changed, 19 insertions(+), 17 deletions(-)
diff --git a/airflow/providers/databricks/operators/databricks_sql.py
b/airflow/providers/databricks/operators/databricks_sql.py
index e2b9101789..d808377cb1 100644
--- a/airflow/providers/databricks/operators/databricks_sql.py
+++ b/airflow/providers/databricks/operators/databricks_sql.py
@@ -63,7 +63,6 @@ class DatabricksSqlOperator(SQLExecuteQueryOperator):
:param output_format: format of output data if ``output_path` is specified.
Possible values are ``csv``, ``json``, ``jsonl``. Default is ``csv``.
:param csv_params: parameters that will be passed to the
``csv.DictWriter`` class used to write CSV data.
- :param do_xcom_push: If True, then the result of SQL executed will be
pushed to an XCom.
"""
template_fields: Sequence[str] = (
@@ -87,7 +86,6 @@ class DatabricksSqlOperator(SQLExecuteQueryOperator):
http_headers: list[tuple[str, str]] | None = None,
catalog: str | None = None,
schema: str | None = None,
- do_xcom_push: bool = False,
output_path: str | None = None,
output_format: str = "csv",
csv_params: dict[str, Any] | None = None,
@@ -99,24 +97,28 @@ class DatabricksSqlOperator(SQLExecuteQueryOperator):
self._output_path = output_path
self._output_format = output_format
self._csv_params = csv_params
+ self.http_path = http_path
+ self.sql_endpoint_name = sql_endpoint_name
+ self.session_configuration = session_configuration
+ self.client_parameters = {} if client_parameters is None else
client_parameters
+ self.hook_params = kwargs.pop("hook_params", {})
+ self.http_headers = http_headers
+ self.catalog = catalog
+ self.schema = schema
- client_parameters = {} if client_parameters is None else
client_parameters
- hook_params = kwargs.pop("hook_params", {})
-
- self.hook_params = {
- "http_path": http_path,
- "session_configuration": session_configuration,
- "sql_endpoint_name": sql_endpoint_name,
- "http_headers": http_headers,
- "catalog": catalog,
- "schema": schema,
+ def get_db_hook(self) -> DatabricksSqlHook:
+ hook_params = {
+ "http_path": self.http_path,
+ "session_configuration": self.session_configuration,
+ "sql_endpoint_name": self.sql_endpoint_name,
+ "http_headers": self.http_headers,
+ "catalog": self.catalog,
+ "schema": self.schema,
"caller": "DatabricksSqlOperator",
- **client_parameters,
- **hook_params,
+ **self.client_parameters,
+ **self.hook_params,
}
-
- def get_db_hook(self) -> DatabricksSqlHook:
- return DatabricksSqlHook(self.databricks_conn_id, **self.hook_params)
+ return DatabricksSqlHook(self.databricks_conn_id, **hook_params)
def _process_output(self, schema, results):
if not self._output_path: