jacobcbeaudin commented on code in PR #63470:
URL: https://github.com/apache/airflow/pull/63470#discussion_r3066945174
##########
providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py:
##########
@@ -526,3 +526,48 @@ def on_kill(self) -> None:
self.log.info("Cancelling the query ids %s", self.query_ids)
self._hook.cancel_queries(self.query_ids)
self.log.info("Query ids %s cancelled successfully",
self.query_ids)
+
+
+class SnowflakeNotebookOperator(SnowflakeSqlApiOperator):
+ """
+ Execute a Snowflake Notebook via the Snowflake SQL API.
+
+ Builds an ``EXECUTE NOTEBOOK`` statement and delegates execution to
+
:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeSqlApiOperator`,
+ which handles query submission, polling, deferral, and cancellation.
+
+ .. seealso::
+ `Snowflake EXECUTE NOTEBOOK
+ <https://docs.snowflake.com/en/sql-reference/sql/execute-notebook>`_
+
+ :param notebook: Fully-qualified notebook name
+ (e.g. ``MY_DB.MY_SCHEMA.MY_NOTEBOOK``).
+ :param parameters: Optional list of parameter strings to pass to the
+ notebook. Only string values are supported by Snowflake; other
+ data types are interpreted as NULL. Parameters are accessible in
+ the notebook via ``sys.argv``.
+ """
+
+ template_fields: Sequence[str] = tuple(
+ set(SnowflakeSqlApiOperator.template_fields) | {"notebook",
"parameters"}
+ )
+
+ def __init__(
+ self,
+ *,
+ notebook: str,
+ parameters: list[str] | None = None,
+ **kwargs: Any,
+ ) -> None:
+ self.notebook = notebook
+ self.parameters = parameters
+ sql = self._build_execute_notebook_query()
+ super().__init__(sql=sql, statement_count=1, **kwargs)
+
Review Comment:
Good catch. Fixed — added an `execute()` override that rebuilds `self.sql`
from the rendered `self.notebook` and `self.parameters` before calling
`super().execute()`. This ensures single-quote escaping applies to the actual
rendered values, not Jinja template strings.
##########
providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py:
##########
@@ -526,3 +526,48 @@ def on_kill(self) -> None:
self.log.info("Cancelling the query ids %s", self.query_ids)
self._hook.cancel_queries(self.query_ids)
self.log.info("Query ids %s cancelled successfully",
self.query_ids)
+
+
+class SnowflakeNotebookOperator(SnowflakeSqlApiOperator):
+ """
+ Execute a Snowflake Notebook via the Snowflake SQL API.
+
+ Builds an ``EXECUTE NOTEBOOK`` statement and delegates execution to
+
:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeSqlApiOperator`,
+ which handles query submission, polling, deferral, and cancellation.
+
+ .. seealso::
+ `Snowflake EXECUTE NOTEBOOK
+ <https://docs.snowflake.com/en/sql-reference/sql/execute-notebook>`_
+
+ :param notebook: Fully-qualified notebook name
+ (e.g. ``MY_DB.MY_SCHEMA.MY_NOTEBOOK``).
+ :param parameters: Optional list of parameter strings to pass to the
+ notebook. Only string values are supported by Snowflake; other
+ data types are interpreted as NULL. Parameters are accessible in
+ the notebook via ``sys.argv``.
+ """
+
+ template_fields: Sequence[str] = tuple(
+ set(SnowflakeSqlApiOperator.template_fields) | {"notebook",
"parameters"}
Review Comment:
The parent `SnowflakeSqlApiOperator` uses the same `set()` pattern, as do
`SnowflakeCheckOperator`, `GKEOperatorMixin`, and many other operators across
the repo. Airflow renders template fields independently regardless of order, so
this has no functional impact. Keeping it consistent with the existing pattern.
##########
providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py:
##########
@@ -526,3 +526,48 @@ def on_kill(self) -> None:
self.log.info("Cancelling the query ids %s", self.query_ids)
self._hook.cancel_queries(self.query_ids)
self.log.info("Query ids %s cancelled successfully",
self.query_ids)
+
+
+class SnowflakeNotebookOperator(SnowflakeSqlApiOperator):
+ """
+ Execute a Snowflake Notebook via the Snowflake SQL API.
+
+ Builds an ``EXECUTE NOTEBOOK`` statement and delegates execution to
+
:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeSqlApiOperator`,
+ which handles query submission, polling, deferral, and cancellation.
+
+ .. seealso::
+ `Snowflake EXECUTE NOTEBOOK
+ <https://docs.snowflake.com/en/sql-reference/sql/execute-notebook>`_
+
+ :param notebook: Fully-qualified notebook name
+ (e.g. ``MY_DB.MY_SCHEMA.MY_NOTEBOOK``).
+ :param parameters: Optional list of parameter strings to pass to the
+ notebook. Only string values are supported by Snowflake; other
+ data types are interpreted as NULL. Parameters are accessible in
+ the notebook via ``sys.argv``.
Review Comment:
Updated the docstring to clarify that values must be strings, which the
`list[str]` type hint already enforces. The previous wording was describing
Snowflake's behavior but could be misread as implying the operator handles
non-string types.
##########
providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py:
##########
@@ -526,3 +526,48 @@ def on_kill(self) -> None:
self.log.info("Cancelling the query ids %s", self.query_ids)
self._hook.cancel_queries(self.query_ids)
self.log.info("Query ids %s cancelled successfully",
self.query_ids)
+
+
+class SnowflakeNotebookOperator(SnowflakeSqlApiOperator):
+ """
+ Execute a Snowflake Notebook via the Snowflake SQL API.
+
+ Builds an ``EXECUTE NOTEBOOK`` statement and delegates execution to
+
:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeSqlApiOperator`,
+ which handles query submission, polling, deferral, and cancellation.
+
+ .. seealso::
+ `Snowflake EXECUTE NOTEBOOK
+ <https://docs.snowflake.com/en/sql-reference/sql/execute-notebook>`_
+
+ :param notebook: Fully-qualified notebook name
+ (e.g. ``MY_DB.MY_SCHEMA.MY_NOTEBOOK``).
+ :param parameters: Optional list of parameter strings to pass to the
+ notebook. Only string values are supported by Snowflake; other
+ data types are interpreted as NULL. Parameters are accessible in
+ the notebook via ``sys.argv``.
+ """
+
+ template_fields: Sequence[str] = tuple(
+ set(SnowflakeSqlApiOperator.template_fields) | {"notebook",
"parameters"}
+ )
+
+ def __init__(
+ self,
+ *,
+ notebook: str,
+ parameters: list[str] | None = None,
+ **kwargs: Any,
+ ) -> None:
+ self.notebook = notebook
+ self.parameters = parameters
+ sql = self._build_execute_notebook_query()
+ super().__init__(sql=sql, statement_count=1, **kwargs)
+
+ def _build_execute_notebook_query(self) -> str:
+ """Build the ``EXECUTE NOTEBOOK`` SQL statement."""
+ params_clause = ""
+ if self.parameters:
+ sanitized = [p.replace("'", "''") for p in self.parameters] #
escape single quotes for SQL
Review Comment:
Same as above — addressed by clarifying the docstring.
##########
providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py:
##########
@@ -526,3 +526,48 @@ def on_kill(self) -> None:
self.log.info("Cancelling the query ids %s", self.query_ids)
self._hook.cancel_queries(self.query_ids)
self.log.info("Query ids %s cancelled successfully",
self.query_ids)
+
+
+class SnowflakeNotebookOperator(SnowflakeSqlApiOperator):
+ """
+ Execute a Snowflake Notebook via the Snowflake SQL API.
+
+ Builds an ``EXECUTE NOTEBOOK`` statement and delegates execution to
+
:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeSqlApiOperator`,
+ which handles query submission, polling, deferral, and cancellation.
+
+ .. seealso::
+ `Snowflake EXECUTE NOTEBOOK
+ <https://docs.snowflake.com/en/sql-reference/sql/execute-notebook>`_
+
+ :param notebook: Fully-qualified notebook name
+ (e.g. ``MY_DB.MY_SCHEMA.MY_NOTEBOOK``).
+ :param parameters: Optional list of parameter strings to pass to the
+ notebook. Only string values are supported by Snowflake; other
+ data types are interpreted as NULL. Parameters are accessible in
+ the notebook via ``sys.argv``.
+ """
+
+ template_fields: Sequence[str] = tuple(
+ set(SnowflakeSqlApiOperator.template_fields) | {"notebook",
"parameters"}
+ )
+
+ def __init__(
+ self,
+ *,
+ notebook: str,
+ parameters: list[str] | None = None,
+ **kwargs: Any,
+ ) -> None:
+ self.notebook = notebook
+ self.parameters = parameters
+ sql = self._build_execute_notebook_query()
+ super().__init__(sql=sql, statement_count=1, **kwargs)
+
+ def _build_execute_notebook_query(self) -> str:
+ """Build the ``EXECUTE NOTEBOOK`` SQL statement."""
+ params_clause = ""
+ if self.parameters:
+ sanitized = [p.replace("'", "''") for p in self.parameters] #
escape single quotes for SQL
+ params_clause = ", ".join(f"'{p}'" for p in sanitized)
+ return f"EXECUTE NOTEBOOK {self.notebook}({params_clause})"
Review Comment:
This is consistent with how all Airflow operators handle SQL object names.
`SQLIntervalCheckOperator`, `SQLTableCheckOperator`, `SQLColumnCheckOperator`,
and `CopyIntoSnowflakeOperator` all interpolate identifiers directly without
validation. Airflow's security model treats DAG authors as trusted code — they
already have full control over the SQL via the parent's `sql` parameter. Adding
validation here would be inconsistent with the rest of the codebase.
##########
providers/snowflake/tests/unit/snowflake/operators/test_snowflake.py:
##########
@@ -605,3 +609,265 @@ def test_snowflake_sql_api_on_kill_no_queries(self,
mock_cancel_queries):
operator.on_kill()
mock_cancel_queries.assert_not_called()
+
+
+class TestSnowflakeNotebookOperatorSQL:
+ """Tests for SQL query building."""
+
+ def test_build_sql_no_params(self):
+ operator = SnowflakeNotebookOperator(
+ task_id=TASK_ID,
+ notebook=NOTEBOOK,
+ )
+ assert operator.sql == "EXECUTE NOTEBOOK MY_DB.MY_SCHEMA.MY_NOTEBOOK()"
+
+ def test_build_sql_with_params(self):
+ operator = SnowflakeNotebookOperator(
+ task_id=TASK_ID,
+ notebook=NOTEBOOK,
+ parameters=["param1", "target_db=PROD"],
+ )
+ assert operator.sql == "EXECUTE NOTEBOOK
MY_DB.MY_SCHEMA.MY_NOTEBOOK('param1', 'target_db=PROD')"
+
+ def test_build_sql_escapes_single_quotes(self):
+ operator = SnowflakeNotebookOperator(
+ task_id=TASK_ID,
+ notebook=NOTEBOOK,
+ parameters=["O'Brien", "it's"],
+ )
+ assert operator.sql == "EXECUTE NOTEBOOK
MY_DB.MY_SCHEMA.MY_NOTEBOOK('O''Brien', 'it''s')"
+
+ def test_build_sql_empty_params(self):
+ operator = SnowflakeNotebookOperator(
+ task_id=TASK_ID,
+ notebook=NOTEBOOK,
+ parameters=[],
+ )
+ assert operator.sql == "EXECUTE NOTEBOOK MY_DB.MY_SCHEMA.MY_NOTEBOOK()"
+
+ def test_template_fields(self):
+ assert "notebook" in SnowflakeNotebookOperator.template_fields
+ assert "parameters" in SnowflakeNotebookOperator.template_fields
+ assert "snowflake_conn_id" in SnowflakeNotebookOperator.template_fields
+
+ def test_statement_count_is_one(self):
+ operator = SnowflakeNotebookOperator(
+ task_id=TASK_ID,
+ notebook=NOTEBOOK,
+ )
+ assert operator.statement_count == 1
+
+ def test_is_subclass_of_snowflake_sql_api_operator(self):
+ assert issubclass(SnowflakeNotebookOperator, SnowflakeSqlApiOperator)
+
+
[email protected]_test
+class TestSnowflakeNotebookOperator:
+ @pytest.fixture(autouse=True)
+ def setup_tests(self):
Review Comment:
This matches the existing `TestSnowflakeSqlApiOperator` class directly
above, which uses the same pattern: class-level `@pytest.mark.db_test` with
`setup_tests` clearing the DB before/after every test, even though most tests
use `context=None`. Narrowing the marker for just the notebook tests while
leaving the parent class untouched would be inconsistent within the same file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]