reganbaum commented on code in PR #62240:
URL: https://github.com/apache/airflow/pull/62240#discussion_r2838626702


##########
providers/amazon/docs/operators/sagemakerunifiedstudio.rst:
##########
@@ -41,18 +42,46 @@ Operators
 
 .. _howto/operator:SageMakerNotebookOperator:
 
-Create an Amazon SageMaker Unified Studio Workflow
-==================================================
+Run Notebooks via the SageMaker Studio SDK
+==========================================
 
-To create an Amazon SageMaker Unified Studio workflow to orchestrate your 
notebook, querybook, and visual ETL runs you can use
-:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`.
+Use 
:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`
+to execute notebooks, SQL notebooks, and Visual ETL jobs through the SageMaker 
Studio SDK execution
+client. This operator relies on the ``sagemaker_studio`` Python library, which 
converts the notebook
+to a ``.ipynb`` file on S3, creates a SageMaker Training Job to run it, and 
captures outputs from S3.

Review Comment:
   `to execute Jupyter notebooks, querybooks, and visual ETL jobs.`
   
   I'm not sure if we need the additional information after that, as those are 
implementation details, while users just need to be concerned about when they 
should use this operator. We can say that it uses the sagemaker_studio Python 
lib, but we don't have to say what it does (in case that implementation ever 
changed, then this would be out of date)



##########
providers/amazon/docs/operators/sagemakerunifiedstudio.rst:
##########
@@ -41,18 +42,46 @@ Operators
 
 .. _howto/operator:SageMakerNotebookOperator:
 
-Create an Amazon SageMaker Unified Studio Workflow
-==================================================
+Run Notebooks via the SageMaker Studio SDK

Review Comment:
   We should clarify here: Jupyter notebooks, querybooks, and visual ETL jobs



##########
providers/amazon/docs/operators/sagemakerunifiedstudio.rst:
##########
@@ -41,18 +42,46 @@ Operators
 
 .. _howto/operator:SageMakerNotebookOperator:
 
-Create an Amazon SageMaker Unified Studio Workflow
-==================================================
+Run Notebooks via the SageMaker Studio SDK
+==========================================
 
-To create an Amazon SageMaker Unified Studio workflow to orchestrate your 
notebook, querybook, and visual ETL runs you can use
-:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`.
+Use 
:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`
+to execute notebooks, SQL notebooks, and Visual ETL jobs through the SageMaker 
Studio SDK execution
+client. This operator relies on the ``sagemaker_studio`` Python library, which 
converts the notebook
+to a ``.ipynb`` file on S3, creates a SageMaker Training Job to run it, and 
captures outputs from S3.
+
+The notebook is identified by its relative file path within the project (e.g. 
``test_notebook.ipynb``).
 
 .. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sagemaker_unified_studio_notebook]
     :end-before: [END howto_operator_sagemaker_unified_studio_notebook]
 
+.. _howto/operator:SageMakerUnifiedStudioNotebookOperator:
+
+Run Notebooks via the DataZone NotebookRun API
+===============================================
+
+Use 
:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio_notebook.SageMakerUnifiedStudioNotebookOperator`
+to execute notebooks through the DataZone ``StartNotebookRun`` API.
+Instead of creating a training job, each run provisions its own dedicated 
compute instance, starts the
+notebook kernel server, and executes cells sequentially. The DataZone 
``GetNotebookRun`` API is used to 
+retrieve notebook run information.
+
+Importantly, this operator allows for creating and managing schedules for 
these notebook runs, with the ability to specify 
+frequencies, compute environment settings, and notebook parameters. API-based 
execution or event-triggered capabilities outside 
+of SageMaker Unified Studio are also possible with the documented DataZone 
APIs.

Review Comment:
   We can remove this



##########
providers/amazon/tests/unit/amazon/aws/hooks/test_sagemaker_unified_studio_notebook.py:
##########
@@ -0,0 +1,287 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from airflow.providers.amazon.aws.hooks.sagemaker_unified_studio_notebook 
import (
+    TWELVE_HOURS_IN_SECONDS,
+    SageMakerUnifiedStudioNotebookHook,
+)
+from airflow.providers.common.compat.sdk import AirflowException
+
+DOMAIN_ID = "dzd_example"
+PROJECT_ID = "proj_example"
+NOTEBOOK_ID = "notebook_123"
+NOTEBOOK_RUN_ID = "run_456"
+
+
+class TestSageMakerUnifiedStudioNotebookHook:
+    @pytest.fixture(autouse=True)
+    def setup(self):
+        with patch(
+            
"airflow.providers.amazon.aws.hooks.sagemaker_unified_studio_notebook.boto3.client"
+        ) as mock_boto:
+            self.mock_client = MagicMock()
+            mock_boto.return_value = self.mock_client
+            self.hook = SageMakerUnifiedStudioNotebookHook(
+                domain_id=DOMAIN_ID,
+                project_id=PROJECT_ID,
+                waiter_delay=5,
+            )
+            _ = self.hook.client
+            yield
+
+    # --- __init__ tests ---
+
+    def test_default_timeout(self):
+        """Default waiter_max_attempts derived from 12-hour timeout."""
+        hook = SageMakerUnifiedStudioNotebookHook(domain_id=DOMAIN_ID, 
project_id=PROJECT_ID, waiter_delay=10)
+        assert hook.waiter_max_attempts == int(TWELVE_HOURS_IN_SECONDS / 10)
+
+    def test_custom_timeout_configuration(self):
+        """waiter_max_attempts derived from timeout_configuration."""
+        hook = SageMakerUnifiedStudioNotebookHook(
+            domain_id=DOMAIN_ID,
+            project_id=PROJECT_ID,
+            waiter_delay=10,
+            timeout_configuration={"run_timeout_in_minutes": 60},
+        )
+        assert hook.waiter_max_attempts == int(60 * 60 / 10)
+
+    def test_timeout_configuration_without_run_timeout(self):
+        """Empty timeout_configuration falls back to default 12-hour 
timeout."""
+        hook = SageMakerUnifiedStudioNotebookHook(
+            domain_id=DOMAIN_ID,
+            project_id=PROJECT_ID,
+            waiter_delay=10,
+            timeout_configuration={},
+        )
+        assert hook.waiter_max_attempts == int(TWELVE_HOURS_IN_SECONDS / 10)
+
+    # --- client property ---
+
+    def test_client_lazy_initialization(self):
+        """Client is created lazily on first access."""
+        with patch(
+            
"airflow.providers.amazon.aws.hooks.sagemaker_unified_studio_notebook.boto3.client"
+        ) as mock_boto:
+            mock_boto.return_value = MagicMock()
+            hook = SageMakerUnifiedStudioNotebookHook(domain_id=DOMAIN_ID, 
project_id=PROJECT_ID)
+            assert hook._client is None
+            client = hook.client
+            mock_boto.assert_called_once_with("datazone")
+            assert client is not None
+            # Second access should reuse the same client
+            client2 = hook.client
+            assert client2 is client
+            mock_boto.assert_called_once()
+
+    # --- start_notebook_run ---
+
+    def test_start_notebook_run_minimal(self):
+        """Start run with only required params."""
+        self.mock_client.start_notebook_run.return_value = {"notebookRunId": 
NOTEBOOK_RUN_ID}
+
+        result = self.hook.start_notebook_run(notebook_id=NOTEBOOK_ID)
+
+        assert result == {"notebookRunId": NOTEBOOK_RUN_ID}
+        call_kwargs = self.mock_client.start_notebook_run.call_args[1]
+        assert call_kwargs["domain_id"] == DOMAIN_ID
+        assert call_kwargs["project_id"] == PROJECT_ID
+        assert call_kwargs["notebook_id"] == NOTEBOOK_ID
+        assert "client_token" in call_kwargs
+        # Optional params should not be present
+        assert "notebook_parameters" not in call_kwargs
+        assert "compute_configuration" not in call_kwargs
+        assert "timeout_configuration" not in call_kwargs
+        assert "trigger_source" not in call_kwargs
+
+    def test_start_notebook_run_all_params(self):
+        """Start run with all optional params provided."""
+        self.mock_client.start_notebook_run.return_value = {"notebookRunId": 
NOTEBOOK_RUN_ID}
+
+        result = self.hook.start_notebook_run(
+            notebook_id=NOTEBOOK_ID,
+            client_token="my-token",
+            notebook_parameters={"param1": "value1"},
+            compute_configuration={"instance_type": "ml.m5.large"},
+            timeout_configuration={"run_timeout_in_minutes": 120},
+            workflow_name="my_dag",
+        )
+
+        assert result == {"notebookRunId": NOTEBOOK_RUN_ID}
+        call_kwargs = self.mock_client.start_notebook_run.call_args[1]
+        assert call_kwargs["client_token"] == "my-token"
+        assert call_kwargs["notebook_parameters"] == {"param1": "value1"}
+        assert call_kwargs["compute_configuration"] == {"instance_type": 
"ml.m5.large"}
+        assert call_kwargs["timeout_configuration"] == 
{"run_timeout_in_minutes": 120}
+        assert call_kwargs["trigger_source"] == {"type": "workflow", 
"workflow_name": "my_dag"}
+
+    def test_start_notebook_run_auto_generates_client_token(self):
+        """client_token is auto-generated as a UUID when not provided."""
+        self.mock_client.start_notebook_run.return_value = {}
+
+        self.hook.start_notebook_run(notebook_id=NOTEBOOK_ID)
+
+        call_kwargs = self.mock_client.start_notebook_run.call_args[1]
+        token = call_kwargs["client_token"]
+        # UUID4 format: 8-4-4-4-12 hex chars
+        assert len(token) == 36
+        assert token.count("-") == 4
+
+    # --- get_notebook_run ---
+
+    def test_get_notebook_run(self):
+        """get_notebook_run passes correct params to the client."""
+        expected = {"status": "COMPLETED", "notebookRunId": NOTEBOOK_RUN_ID}

Review Comment:
   nit: can we just use statuses that the notebook has in the tests? e.g., 
succeeded



##########
providers/amazon/docs/operators/sagemakerunifiedstudio.rst:
##########
@@ -41,18 +42,46 @@ Operators
 
 .. _howto/operator:SageMakerNotebookOperator:
 
-Create an Amazon SageMaker Unified Studio Workflow
-==================================================
+Run Notebooks via the SageMaker Studio SDK
+==========================================
 
-To create an Amazon SageMaker Unified Studio workflow to orchestrate your 
notebook, querybook, and visual ETL runs you can use
-:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`.
+Use 
:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`
+to execute notebooks, SQL notebooks, and Visual ETL jobs through the SageMaker 
Studio SDK execution
+client. This operator relies on the ``sagemaker_studio`` Python library, which 
converts the notebook
+to a ``.ipynb`` file on S3, creates a SageMaker Training Job to run it, and 
captures outputs from S3.
+
+The notebook is identified by its relative file path within the project (e.g. 
``test_notebook.ipynb``).
 
 .. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sagemaker_unified_studio_notebook]
     :end-before: [END howto_operator_sagemaker_unified_studio_notebook]
 
+.. _howto/operator:SageMakerUnifiedStudioNotebookOperator:
+
+Run Notebooks via the DataZone NotebookRun API

Review Comment:
   Let's specify "SageMaker Unified Studio notebooks" here



##########
providers/amazon/docs/operators/sagemakerunifiedstudio.rst:
##########
@@ -41,18 +42,46 @@ Operators
 
 .. _howto/operator:SageMakerNotebookOperator:
 
-Create an Amazon SageMaker Unified Studio Workflow
-==================================================
+Run Notebooks via the SageMaker Studio SDK
+==========================================
 
-To create an Amazon SageMaker Unified Studio workflow to orchestrate your 
notebook, querybook, and visual ETL runs you can use
-:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`.
+Use 
:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`
+to execute notebooks, SQL notebooks, and Visual ETL jobs through the SageMaker 
Studio SDK execution
+client. This operator relies on the ``sagemaker_studio`` Python library, which 
converts the notebook
+to a ``.ipynb`` file on S3, creates a SageMaker Training Job to run it, and 
captures outputs from S3.
+
+The notebook is identified by its relative file path within the project (e.g. 
``test_notebook.ipynb``).
 
 .. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sagemaker_unified_studio_notebook]
     :end-before: [END howto_operator_sagemaker_unified_studio_notebook]
 
+.. _howto/operator:SageMakerUnifiedStudioNotebookOperator:
+
+Run Notebooks via the DataZone NotebookRun API
+===============================================
+
+Use 
:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio_notebook.SageMakerUnifiedStudioNotebookOperator`
+to execute notebooks through the DataZone ``StartNotebookRun`` API.
+Instead of creating a training job, each run provisions its own dedicated 
compute instance, starts the
+notebook kernel server, and executes cells sequentially. The DataZone 
``GetNotebookRun`` API is used to 
+retrieve notebook run information.
+
+Importantly, this operator allows for creating and managing schedules for 
these notebook runs, with the ability to specify 
+frequencies, compute environment settings, and notebook parameters. API-based 
execution or event-triggered capabilities outside 
+of SageMaker Unified Studio are also possible with the documented DataZone 
APIs.
+
+The notebook is identified by its notebook asset ID (e.g. ``nb-1234567890``), 
along with the
+domain ID and project ID.

Review Comment:
   `The notebook is identified by its notebook ID, along with the domain ID and 
project ID where the notebook resides.`



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker_unified_studio.py:
##########
@@ -39,7 +39,7 @@
 
 class SageMakerNotebookOperator(BaseOperator):
     """
-    Provides Artifact execution functionality for Sagemaker Unified Studio 
Workflows.
+    Provides notebook artifact execution functionality for Sagemaker Unified 
Studio Workflows.

Review Comment:
   Jupyter notebook artifact



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/sagemaker_unified_studio_notebook.py:
##########
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Trigger for monitoring SageMaker Unified Studio Notebook runs 
asynchronously."""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncIterator
+from functools import partial
+from typing import Any
+
+import boto3
+
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+IN_PROGRESS_STATES = {"QUEUED", "STARTING", "RUNNING", "STOPPING"}
+FINISHED_STATES = {"SUCCEEDED", "STOPPED"}
+FAILURE_STATES = {"FAILED"}
+
+TWELVE_HOURS_IN_SECONDS = 12 * 60 * 60
+
+
+class SageMakerUnifiedStudioNotebookTrigger(BaseTrigger):
+    """
+    Watches an asynchronous notebook job, triggering when it reaches a 
terminal state.
+
+    :param notebook_run_id: The ID of the notebook run to monitor.
+    :param domain_id: The ID of the DataZone domain.
+    :param project_id: The ID of the DataZone project.
+    :param waiter_delay: Interval in seconds between polls (default: 10).
+    :param timeout_configuration: Optional timeout settings. When provided, 
the maximum
+        number of poll attempts is derived from ``run_timeout_in_minutes * 60 
/ waiter_delay``.
+        Defaults to a 12-hour timeout when omitted.
+        Example: {"run_timeout_in_minutes": 720}
+    """
+
+    def __init__(
+        self,
+        notebook_run_id: str,
+        domain_id: str,
+        project_id: str,
+        waiter_delay: int = 10,
+        timeout_configuration: dict | None = None,
+    ):
+        super().__init__()
+        self.notebook_run_id = notebook_run_id
+        self.domain_id = domain_id
+        self.project_id = project_id
+        self.waiter_delay = waiter_delay
+        self.timeout_configuration = timeout_configuration
+        run_timeout = (timeout_configuration or 
{}).get("run_timeout_in_minutes")
+        if run_timeout:

Review Comment:
   Same comment here as above



##########
providers/amazon/docs/operators/sagemakerunifiedstudio.rst:
##########
@@ -41,18 +42,46 @@ Operators
 
 .. _howto/operator:SageMakerNotebookOperator:
 
-Create an Amazon SageMaker Unified Studio Workflow
-==================================================
+Run Notebooks via the SageMaker Studio SDK
+==========================================
 
-To create an Amazon SageMaker Unified Studio workflow to orchestrate your 
notebook, querybook, and visual ETL runs you can use
-:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`.
+Use 
:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`
+to execute notebooks, SQL notebooks, and Visual ETL jobs through the SageMaker 
Studio SDK execution
+client. This operator relies on the ``sagemaker_studio`` Python library, which 
converts the notebook
+to a ``.ipynb`` file on S3, creates a SageMaker Training Job to run it, and 
captures outputs from S3.
+
+The notebook is identified by its relative file path within the project (e.g. 
``test_notebook.ipynb``).

Review Comment:
   `The artifact is...`



##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/sagemaker_unified_studio_notebook.py:
##########
@@ -0,0 +1,220 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module contains the Amazon SageMaker Unified Studio Notebook Run 
hook."""
+
+from __future__ import annotations
+
+import time
+import uuid
+
+import boto3
+
+from airflow.providers.common.compat.sdk import AirflowException, BaseHook
+
+TWELVE_HOURS_IN_SECONDS = 12 * 60 * 60
+
+
+class SageMakerUnifiedStudioNotebookHook(BaseHook):
+    """
+    Interact with Sagemaker Unified Studio Workflows for asynchronous notebook 
execution.
+
+    This hook provides a wrapper around the DataZone StartNotebookRun / 
GetNotebookRun APIs.
+
+    Examples:
+     .. code-block:: python
+
+        from 
airflow.providers.amazon.aws.hooks.sagemaker_unified_studio_notebook import (
+            SageMakerUnifiedStudioNotebookHook,
+        )
+
+        hook = SageMakerUnifiedStudioNotebookHook(
+            domain_id="dzd_example",
+            project_id="proj_example",
+            waiter_delay=10,
+        )
+
+    :param domain_id: The ID of the DataZone domain containing the notebook.
+    :param project_id: The ID of the DataZone project containing the notebook.
+    :param client_token: Idempotency token. Auto-generated if not provided.
+    :param notebook_parameters: Parameters to pass to the notebook.
+        Example: {"param1": "value1", "param2": "value2"}
+    :param compute_configuration: Compute config to use for the notebook 
execution.
+        Example: {"instance_type": "ml.m5.large"}
+    :param waiter_delay: Interval in seconds to poll the notebook run status.
+    :param timeout_configuration: Timeout settings for the notebook execution.
+        When provided, the maximum number of poll attempts is derived from
+        ``run_timeout_in_minutes * 60 / waiter_delay``. Defaults to a 12-hour 
timeout when omitted.

Review Comment:
   Nit: can simplify to `Defaults to 12 hours.`



##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/sagemaker_unified_studio_notebook.py:
##########
@@ -0,0 +1,220 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module contains the Amazon SageMaker Unified Studio Notebook Run 
hook."""
+
+from __future__ import annotations
+
+import time
+import uuid
+
+import boto3
+
+from airflow.providers.common.compat.sdk import AirflowException, BaseHook
+
+TWELVE_HOURS_IN_SECONDS = 12 * 60 * 60
+
+
+class SageMakerUnifiedStudioNotebookHook(BaseHook):
+    """
+    Interact with Sagemaker Unified Studio Workflows for asynchronous notebook 
execution.
+
+    This hook provides a wrapper around the DataZone StartNotebookRun / 
GetNotebookRun APIs.
+
+    Examples:
+     .. code-block:: python
+
+        from 
airflow.providers.amazon.aws.hooks.sagemaker_unified_studio_notebook import (
+            SageMakerUnifiedStudioNotebookHook,
+        )
+
+        hook = SageMakerUnifiedStudioNotebookHook(
+            domain_id="dzd_example",
+            project_id="proj_example",
+            waiter_delay=10,
+        )
+
+    :param domain_id: The ID of the DataZone domain containing the notebook.
+    :param project_id: The ID of the DataZone project containing the notebook.
+    :param client_token: Idempotency token. Auto-generated if not provided.
+    :param notebook_parameters: Parameters to pass to the notebook.
+        Example: {"param1": "value1", "param2": "value2"}
+    :param compute_configuration: Compute config to use for the notebook 
execution.
+        Example: {"instance_type": "ml.m5.large"}
+    :param waiter_delay: Interval in seconds to poll the notebook run status.
+    :param timeout_configuration: Timeout settings for the notebook execution.
+        When provided, the maximum number of poll attempts is derived from
+        ``run_timeout_in_minutes * 60 / waiter_delay``. Defaults to a 12-hour 
timeout when omitted.
+        Example: {"run_timeout_in_minutes": 720}
+    :param workflow_name: Name of the workflow (DAG) that triggered this run.
+    """
+
+    def __init__(
+        self,
+        domain_id: str,
+        project_id: str,
+        waiter_delay: int = 10,
+        timeout_configuration: dict | None = None,
+        *args,
+        **kwargs,
+    ):
+        super().__init__(*args, **kwargs)
+        self.domain_id = domain_id
+        self.project_id = project_id
+        self.waiter_delay = waiter_delay
+        self.timeout_configuration = timeout_configuration
+        run_timeout = (timeout_configuration or 
{}).get("run_timeout_in_minutes")
+        if run_timeout:
+            self.waiter_max_attempts = int(run_timeout * 60 / 
self.waiter_delay)
+        else:
+            self.waiter_max_attempts = int(
+                TWELVE_HOURS_IN_SECONDS / self.waiter_delay
+            )  # Default timeout is 12 hours

Review Comment:
   can remove this if/else by setting `TWELVE_HOURS_IN_SECONDS * 60` (or make 
it TWELVE_HOURS_IN_MINUTES) on L80 as the fallback



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/sagemaker_unified_studio_notebook.py:
##########
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Trigger for monitoring SageMaker Unified Studio Notebook runs 
asynchronously."""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncIterator
+from functools import partial
+from typing import Any
+
+import boto3
+
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+IN_PROGRESS_STATES = {"QUEUED", "STARTING", "RUNNING", "STOPPING"}
+FINISHED_STATES = {"SUCCEEDED", "STOPPED"}
+FAILURE_STATES = {"FAILED"}
+
+TWELVE_HOURS_IN_SECONDS = 12 * 60 * 60
+
+
+class SageMakerUnifiedStudioNotebookTrigger(BaseTrigger):
+    """
+    Watches an asynchronous notebook job, triggering when it reaches a 
terminal state.

Review Comment:
   Let's use run, not job, to match the API language



##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/sagemaker_unified_studio.py:
##########
@@ -30,7 +30,7 @@
 
 class SageMakerNotebookHook(BaseHook):
     """
-    Interact with Sagemaker Unified Studio Workflows.
+    Interact with Sagemaker Unified Studio Workflows for notebook execution.

Review Comment:
   `for Jupyter notebook execution`



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/sagemaker_unified_studio_notebook.py:
##########
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Trigger for monitoring SageMaker Unified Studio Notebook runs 
asynchronously."""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncIterator
+from functools import partial
+from typing import Any
+
+import boto3
+
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+IN_PROGRESS_STATES = {"QUEUED", "STARTING", "RUNNING", "STOPPING"}
+FINISHED_STATES = {"SUCCEEDED", "STOPPED"}
+FAILURE_STATES = {"FAILED"}
+
+TWELVE_HOURS_IN_SECONDS = 12 * 60 * 60
+
+
+class SageMakerUnifiedStudioNotebookTrigger(BaseTrigger):
+    """
+    Watches an asynchronous notebook job, triggering when it reaches a 
terminal state.
+
+    :param notebook_run_id: The ID of the notebook run to monitor.
+    :param domain_id: The ID of the DataZone domain.
+    :param project_id: The ID of the DataZone project.
+    :param waiter_delay: Interval in seconds between polls (default: 10).
+    :param timeout_configuration: Optional timeout settings. When provided, 
the maximum
+        number of poll attempts is derived from ``run_timeout_in_minutes * 60 
/ waiter_delay``.
+        Defaults to a 12-hour timeout when omitted.
+        Example: {"run_timeout_in_minutes": 720}
+    """
+
+    def __init__(
+        self,
+        notebook_run_id: str,
+        domain_id: str,
+        project_id: str,
+        waiter_delay: int = 10,
+        timeout_configuration: dict | None = None,
+    ):
+        super().__init__()
+        self.notebook_run_id = notebook_run_id
+        self.domain_id = domain_id
+        self.project_id = project_id
+        self.waiter_delay = waiter_delay
+        self.timeout_configuration = timeout_configuration
+        run_timeout = (timeout_configuration or 
{}).get("run_timeout_in_minutes")
+        if run_timeout:
+            self.waiter_max_attempts = int(run_timeout * 60 / 
self.waiter_delay)
+        else:
+            self.waiter_max_attempts = int(TWELVE_HOURS_IN_SECONDS / 
self.waiter_delay)
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        return (
+            self.__class__.__module__ + "." + self.__class__.__qualname__,
+            {
+                "notebook_run_id": self.notebook_run_id,
+                "domain_id": self.domain_id,
+                "project_id": self.project_id,
+                "waiter_delay": self.waiter_delay,
+                "timeout_configuration": self.timeout_configuration,
+            },
+        )
+
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        client = boto3.client("datazone")
+        if not hasattr(client, "get_notebook_run"):
+            yield TriggerEvent(
+                {
+                    "status": "error",
+                    "notebook_run_id": self.notebook_run_id,
+                    "message": "The 'get_notebook_run' API is not available in 
the installed "
+                    "boto3/botocore version. Please upgrade boto3/botocore to 
a version "
+                    "that supports the DataZone NotebookRun APIs.",
+                }
+            )
+            return
+        try:
+            for _ in range(self.waiter_max_attempts):
+                loop = asyncio.get_running_loop()
+                response = await loop.run_in_executor(
+                    None,
+                    partial(
+                        client.get_notebook_run,
+                        domain_id=self.domain_id,
+                        notebook_run_id=self.notebook_run_id,
+                    ),
+                )
+                status = response.get("status", "")
+                error_message = response.get("errorMessage", "")
+
+                if status in FINISHED_STATES:
+                    yield TriggerEvent(
+                        {"status": "success", "notebook_run_id": 
self.notebook_run_id, "state": status}

Review Comment:
   I don't think we should mark stopped as a success; can we have another 
status for stopped to return? / why do we need to return both status and state 
fields?



##########
providers/amazon/docs/operators/sagemakerunifiedstudio.rst:
##########
@@ -41,18 +42,46 @@ Operators
 
 .. _howto/operator:SageMakerNotebookOperator:
 
-Create an Amazon SageMaker Unified Studio Workflow
-==================================================
+Run Notebooks via the SageMaker Studio SDK
+==========================================
 
-To create an Amazon SageMaker Unified Studio workflow to orchestrate your 
notebook, querybook, and visual ETL runs you can use
-:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`.
+Use 
:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio.SageMakerNotebookOperator`
+to execute notebooks, SQL notebooks, and Visual ETL jobs through the SageMaker 
Studio SDK execution
+client. This operator relies on the ``sagemaker_studio`` Python library, which 
converts the notebook
+to a ``.ipynb`` file on S3, creates a SageMaker Training Job to run it, and 
captures outputs from S3.
+
+The notebook is identified by its relative file path within the project (e.g. 
``test_notebook.ipynb``).
 
 .. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_sagemaker_unified_studio_notebook]
     :end-before: [END howto_operator_sagemaker_unified_studio_notebook]
 
+.. _howto/operator:SageMakerUnifiedStudioNotebookOperator:
+
+Run Notebooks via the DataZone NotebookRun API
+===============================================
+
+Use 
:class:`~airflow.providers.amazon.aws.operators.sagemaker_unified_studio_notebook.SageMakerUnifiedStudioNotebookOperator`
+to execute notebooks through the DataZone ``StartNotebookRun`` API.
+Instead of creating a training job, each run provisions its own dedicated 
compute instance, starts the
+notebook kernel server, and executes cells sequentially. The DataZone 
``GetNotebookRun`` API is used to 

Review Comment:
   We don't need these implementation details. We can just say it runs 
notebooks with the `StartNotebookRun` API



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/sagemaker_unified_studio_notebook.py:
##########
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Trigger for monitoring SageMaker Unified Studio Notebook runs 
asynchronously."""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncIterator
+from functools import partial
+from typing import Any
+
+import boto3
+
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+IN_PROGRESS_STATES = {"QUEUED", "STARTING", "RUNNING", "STOPPING"}
+FINISHED_STATES = {"SUCCEEDED", "STOPPED"}
+FAILURE_STATES = {"FAILED"}
+
+TWELVE_HOURS_IN_SECONDS = 12 * 60 * 60
+
+
+class SageMakerUnifiedStudioNotebookTrigger(BaseTrigger):
+    """
+    Watches an asynchronous notebook job, triggering when it reaches a 
terminal state.
+
+    :param notebook_run_id: The ID of the notebook run to monitor.
+    :param domain_id: The ID of the DataZone domain.
+    :param project_id: The ID of the DataZone project.
+    :param waiter_delay: Interval in seconds between polls (default: 10).
+    :param timeout_configuration: Optional timeout settings. When provided, 
the maximum
+        number of poll attempts is derived from ``run_timeout_in_minutes * 60 
/ waiter_delay``.
+        Defaults to a 12-hour timeout when omitted.
+        Example: {"run_timeout_in_minutes": 720}
+    """
+
+    def __init__(
+        self,
+        notebook_run_id: str,
+        domain_id: str,
+        project_id: str,
+        waiter_delay: int = 10,
+        timeout_configuration: dict | None = None,
+    ):
+        super().__init__()
+        self.notebook_run_id = notebook_run_id
+        self.domain_id = domain_id
+        self.project_id = project_id
+        self.waiter_delay = waiter_delay
+        self.timeout_configuration = timeout_configuration
+        run_timeout = (timeout_configuration or 
{}).get("run_timeout_in_minutes")
+        if run_timeout:
+            self.waiter_max_attempts = int(run_timeout * 60 / 
self.waiter_delay)
+        else:
+            self.waiter_max_attempts = int(TWELVE_HOURS_IN_SECONDS / 
self.waiter_delay)
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        return (
+            self.__class__.__module__ + "." + self.__class__.__qualname__,
+            {
+                "notebook_run_id": self.notebook_run_id,
+                "domain_id": self.domain_id,
+                "project_id": self.project_id,
+                "waiter_delay": self.waiter_delay,
+                "timeout_configuration": self.timeout_configuration,
+            },
+        )
+
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        client = boto3.client("datazone")
+        if not hasattr(client, "get_notebook_run"):
+            yield TriggerEvent(
+                {
+                    "status": "error",
+                    "notebook_run_id": self.notebook_run_id,
+                    "message": "The 'get_notebook_run' API is not available in 
the installed "
+                    "boto3/botocore version. Please upgrade boto3/botocore to 
a version "
+                    "that supports the DataZone NotebookRun APIs.",
+                }
+            )
+            return
+        try:
+            for _ in range(self.waiter_max_attempts):
+                loop = asyncio.get_running_loop()
+                response = await loop.run_in_executor(
+                    None,
+                    partial(
+                        client.get_notebook_run,
+                        domain_id=self.domain_id,
+                        notebook_run_id=self.notebook_run_id,
+                    ),
+                )
+                status = response.get("status", "")
+                error_message = response.get("errorMessage", "")
+
+                if status in FINISHED_STATES:
+                    yield TriggerEvent(
+                        {"status": "success", "notebook_run_id": 
self.notebook_run_id, "state": status}
+                    )
+                    return
+
+                if status in FAILURE_STATES:
+                    yield TriggerEvent(
+                        {
+                            "status": "failed",
+                            "notebook_run_id": self.notebook_run_id,
+                            "message": error_message or f"Notebook run 
{self.notebook_run_id} failed",
+                        }
+                    )
+                    return
+
+                if status not in IN_PROGRESS_STATES:
+                    yield TriggerEvent(
+                        {
+                            "status": "error",
+                            "notebook_run_id": self.notebook_run_id,
+                            "message": f"Notebook run {self.notebook_run_id} 
reached unexpected state: {status}",
+                        }
+                    )
+                    return
+
+                self.log.info(
+                    "Notebook run %s is %s, checking again in %ss",

Review Comment:
   nit: I see in some places we use f strings, and some we don't. Can we use f 
strings across the board as they're generally more readable? 



##########
providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio_notebook.py:
##########
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.providers.amazon.aws.operators.sagemaker_unified_studio_notebook 
import (
+    SageMakerUnifiedStudioNotebookOperator,
+)
+from airflow.providers.amazon.aws.sensors.sagemaker_unified_studio_notebook 
import (
+    SageMakerUnifiedStudioNotebookSensor,
+)
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import DAG, chain
+else:
+    from airflow.models.baseoperator import chain  # type: 
ignore[attr-defined,no-redef]
+    from airflow.models.dag import DAG  # type: 
ignore[attr-defined,no-redef,assignment]
+
+from system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+"""
+Prerequisites: The account which runs this test must have the following:
+1. A SageMaker Unified Studio Domain (with default VPC and roles)
+2. A project within the SageMaker Unified Studio Domain
+3. A notebook asset registered in the project with a known notebook_id
+
+This test calls the DataZone StartNotebookRun / GetNotebookRun APIs directly
+via boto3 using standard IAM credentials. No MWAA environment emulation is 
performed.
+"""
+
+DAG_ID = "example_sagemaker_unified_studio_notebook"
+
+# Externally fetched variables:
+DOMAIN_ID_KEY = "DOMAIN_ID"
+PROJECT_ID_KEY = "PROJECT_ID"
+NOTEBOOK_ID_KEY = "NOTEBOOK_ID"
+
+sys_test_context_task = (
+    SystemTestContextBuilder()
+    .add_variable(DOMAIN_ID_KEY)
+    .add_variable(PROJECT_ID_KEY)
+    .add_variable(NOTEBOOK_ID_KEY)
+    .build()
+)
+
+with DAG(
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+
+    test_env_id = test_context[ENV_ID_KEY]
+    domain_id = test_context[DOMAIN_ID_KEY]
+    project_id = test_context[PROJECT_ID_KEY]
+    notebook_id = test_context[NOTEBOOK_ID_KEY]
+
+    # [START howto_operator_sagemaker_unified_studio_notebook]
+    run_notebook = SageMakerUnifiedStudioNotebookOperator(
+        task_id="notebook-task",
+        notebook_id=notebook_id,  # This should be the notebook asset 
identifier from within the SageMaker Unified Studio domain
+        domain_id=domain_id,
+        project_id=project_id,
+        client_token="unique-idempotency-token",  # optional

Review Comment:
   By hardcoding this, are we sure we won't ever have overlapping tests? If so, 
this could become a problem. Might be better to use a timestamp or omit it for 
testing purposes 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to