ferruzzi commented on code in PR #32738:
URL: https://github.com/apache/airflow/pull/32738#discussion_r1272512406


##########
BREEZE.rst:
##########
@@ -462,6 +462,51 @@ Those are all available flags of ``build-docs`` command:
   :width: 100%

Review Comment:
   The changes in this file don't feel related to me.  Am I missing something, 
or should this be a separate PR?



##########
BREEZE.rst:
##########
@@ -462,6 +462,51 @@ Those are all available flags of ``build-docs`` command:
   :width: 100%
   :alt: Breeze build documentation
 
+Publishing the documentation
+--------------------------

Review Comment:
   This is at least one of the reasons the build-docs is failing; the line 
needs to be the same length as the heading.
   
   ```suggestion
   Publishing the documentation
   ----------------------------
   ```



##########
airflow/providers/amazon/aws/hooks/neptune.py:
##########
@@ -0,0 +1,113 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Interact with AWS Neptune."""
+from __future__ import annotations
+
+import time
+from typing import Callable
+
+from airflow.exceptions import AirflowException, AirflowNotFoundException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class NeptuneHook(AwsBaseHook):
+    """
+    Interact with AWS Neptune using proper client from the boto3 library.
+
+    Hook attribute `conn` has all methods that listed in documentation
+
+    .. seealso::
+        - 
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/neptune.html
+        - https://docs.aws.amazon.com/neptune/index.html
+
+    Additional arguments (such as ``aws_conn_id`` or ``region_name``) may be 
specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    """
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = "neptune"
+        super().__init__(*args, **kwargs)
+
+    def get_db_cluster_state(self, db_cluster_id: str) -> str:
+        """
+        Get the current state of a DB cluster.
+
+        :param db_cluster_id: The ID of the target DB cluster.
+        :return: Returns the status of the DB cluster as a string (eg. 
"available")
+        :rtype: str
+        :raises AirflowNotFoundException: If the DB cluster does not exist.

Review Comment:
   ```suggestion
   
   ```
   
   We don't list rtypes anymore and I haven't seen anywhere else that lists the 
:raises:, though I'm not necesarily against the idea on that one. 



##########
airflow/providers/amazon/aws/operators/neptune.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from functools import cached_property
+from typing import TYPE_CHECKING
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook
+from airflow.providers.amazon.aws.utils.neptune import NeptuneDbType
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class NeptuneStartDbOperator(BaseOperator):
+    """
+    Starts a Neptune DB cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:NeptuneStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: 
"cluster")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: 
True)
+
+    Note: In boto3 supports starting db operator only for cluster and not for 
instance db_type.
+        So, default is maintained as Cluster, however it can be extended once 
instance db_type is available,
+        similar to RDS database implementation
+    """
+
+    template_fields = ("db_identifier", "db_type")
+    STATES_FOR_STARTING = ["available", "starting"]
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        region_name: str | None = None,
+        db_type: NeptuneDbType | str = NeptuneDbType.CLUSTER,
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.db_identifier = db_identifier
+        self.region_name = region_name
+        self.db_type = db_type
+        self.aws_conn_id = aws_conn_id
+        self.wait_for_completion = wait_for_completion
+
+    @cached_property
+    def hook(self) -> NeptuneHook:
+        """Create and return a NeptuneHook."""
+        return NeptuneHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
+
+    def execute(self, context: Context) -> str:
+        self.db_type = NeptuneDbType(self.db_type)
+        start_db_response = None
+        if (
+            self.hook.get_db_cluster_state(self.db_identifier)
+            not in NeptuneStartDbOperator.STATES_FOR_STARTING
+        ):
+            self._start_db()
+
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info("Starting DB %s '%s'", self.db_type.value, 
self.db_identifier)
+        self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+
+    def _wait_until_db_available(self):
+        self.log.info("Waiting for DB %s to reach 'available' state", 
self.db_type.value)
+        self.hook.wait_for_db_cluster_state(self.db_identifier, 
target_state="available")
+
+
+class NeptuneStopDbOperator(BaseOperator):
+    """
+    Stops a Neptune DB cluster

Review Comment:
   D205 nitpick.
   
   ```suggestion
       Stops a Neptune DB cluster.
   ```



##########
airflow/providers/amazon/aws/operators/neptune.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from functools import cached_property
+from typing import TYPE_CHECKING
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook
+from airflow.providers.amazon.aws.utils.neptune import NeptuneDbType
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class NeptuneStartDbOperator(BaseOperator):
+    """
+    Starts a Neptune DB cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:NeptuneStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: 
"cluster")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: 
True)
+
+    Note: In boto3 supports starting db operator only for cluster and not for 
instance db_type.
+        So, default is maintained as Cluster, however it can be extended once 
instance db_type is available,
+        similar to RDS database implementation
+    """
+
+    template_fields = ("db_identifier", "db_type")
+    STATES_FOR_STARTING = ["available", "starting"]
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        region_name: str | None = None,
+        db_type: NeptuneDbType | str = NeptuneDbType.CLUSTER,
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.db_identifier = db_identifier
+        self.region_name = region_name
+        self.db_type = db_type
+        self.aws_conn_id = aws_conn_id
+        self.wait_for_completion = wait_for_completion
+
+    @cached_property
+    def hook(self) -> NeptuneHook:
+        """Create and return a NeptuneHook."""
+        return NeptuneHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
+
+    def execute(self, context: Context) -> str:
+        self.db_type = NeptuneDbType(self.db_type)
+        start_db_response = None
+        if (
+            self.hook.get_db_cluster_state(self.db_identifier)
+            not in NeptuneStartDbOperator.STATES_FOR_STARTING
+        ):
+            self._start_db()
+
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)

Review Comment:
   What am I missing here?   It looks like `start_db_response` is initialized 
with a value of None on L77 then never assigned a value?  Pretty sure your 
intent was to assign it on L82?  `start_db_response = self._start_db()`



##########
airflow/providers/amazon/aws/operators/neptune.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from functools import cached_property
+from typing import TYPE_CHECKING
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook
+from airflow.providers.amazon.aws.utils.neptune import NeptuneDbType
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class NeptuneStartDbOperator(BaseOperator):
+    """
+    Starts a Neptune DB cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:NeptuneStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: 
"cluster")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: 
True)
+
+    Note: In boto3 supports starting db operator only for cluster and not for 
instance db_type.
+        So, default is maintained as Cluster, however it can be extended once 
instance db_type is available,
+        similar to RDS database implementation
+    """
+
+    template_fields = ("db_identifier", "db_type")
+    STATES_FOR_STARTING = ["available", "starting"]
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        region_name: str | None = None,
+        db_type: NeptuneDbType | str = NeptuneDbType.CLUSTER,
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.db_identifier = db_identifier
+        self.region_name = region_name
+        self.db_type = db_type
+        self.aws_conn_id = aws_conn_id
+        self.wait_for_completion = wait_for_completion
+
+    @cached_property
+    def hook(self) -> NeptuneHook:
+        """Create and return a NeptuneHook."""
+        return NeptuneHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
+
+    def execute(self, context: Context) -> str:
+        self.db_type = NeptuneDbType(self.db_type)
+        start_db_response = None
+        if (
+            self.hook.get_db_cluster_state(self.db_identifier)
+            not in NeptuneStartDbOperator.STATES_FOR_STARTING
+        ):
+            self._start_db()
+
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info("Starting DB %s '%s'", self.db_type.value, 
self.db_identifier)
+        self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+
+    def _wait_until_db_available(self):
+        self.log.info("Waiting for DB %s to reach 'available' state", 
self.db_type.value)
+        self.hook.wait_for_db_cluster_state(self.db_identifier, 
target_state="available")
+
+
+class NeptuneStopDbOperator(BaseOperator):
+    """
+    Stops a Neptune DB cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:NeptuneStopDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: 
"cluster")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: 
True)
+
+    Note: In boto3 supports starting db operator only for cluster and not for 
instance db_type.
+        So, default is maintained as Cluster, however it can be extended once 
instance db_type is available,
+        similar to RDS database implementation
+    """
+
+    template_fields = ("db_identifier", "db_type")
+    STATES_FOR_STOPPING = ["stopped", "stopping"]
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        region_name: str | None = None,
+        db_type: NeptuneDbType | str = NeptuneDbType.INSTANCE,
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.db_identifier = db_identifier
+        self.region_name = region_name
+        self.db_type = db_type
+        self.aws_conn_id = aws_conn_id
+        self.wait_for_completion = wait_for_completion
+
+    @cached_property
+    def hook(self) -> NeptuneHook:
+        """Create and return a NeptuneHook."""
+        return NeptuneHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
+
+    def execute(self, context: Context) -> str:
+        self.db_type = NeptuneDbType(self.db_type)
+        stop_db_response = None
+        if (
+            self.hook.get_db_cluster_state(self.db_identifier)
+            not in NeptuneStopDbOperator.STATES_FOR_STOPPING
+        ):
+            stop_db_response = self._stop_db()
+        if self.wait_for_completion:
+            self._wait_until_db_stopped()
+        return json.dumps(stop_db_response, default=str)

Review Comment:
   Here and above:   Please verify this manually and let me know if this is 
working as you expect.  I've seen json.dumps fail in the past when the json 
includes datetimes, and these responses do.



##########
airflow/providers/amazon/aws/utils/neptune.py:
##########
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from enum import Enum
+
+
+class NeptuneDbType(Enum):
+    """Only available types for the AWS Neptune DB"""

Review Comment:
   D205 nitpick
   
   ```suggestion
       """Only available types for the AWS Neptune DB."""
   ```



##########
airflow/providers/amazon/aws/operators/neptune.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from functools import cached_property
+from typing import TYPE_CHECKING
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook
+from airflow.providers.amazon.aws.utils.neptune import NeptuneDbType
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class NeptuneStartDbOperator(BaseOperator):
+    """
+    Starts a Neptune DB cluster

Review Comment:
   ```suggestion
       Starts a Neptune DB cluster.
   ```



##########
BREEZE.rst:
##########
@@ -462,6 +462,51 @@ Those are all available flags of ``build-docs`` command:
   :width: 100%
   :alt: Breeze build documentation
 
+Publishing the documentation
+--------------------------
+
+To publish the documentation generated by ``build-docs`` in Breeze to 
``airflow-site``,
+use the ``release-management publish-docs`` command:
+
+.. code-block:: bash
+
+     breeze release-management publish-docs
+
+The publishing documentation consists  steps:

Review Comment:
   ```suggestion
   Publishing documentation consists of the following steps:
   ```



##########
airflow/providers/amazon/aws/hooks/neptune.py:
##########
@@ -0,0 +1,113 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Interact with AWS Neptune."""
+from __future__ import annotations
+
+import time
+from typing import Callable
+
+from airflow.exceptions import AirflowException, AirflowNotFoundException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class NeptuneHook(AwsBaseHook):
+    """
+    Interact with AWS Neptune using proper client from the boto3 library.
+
+    Hook attribute `conn` has all methods that listed in documentation

Review Comment:
   ```suggestion
       Hook attribute `conn` has all methods listed in the documentation
   ```



##########
airflow/providers/amazon/aws/operators/neptune.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from functools import cached_property
+from typing import TYPE_CHECKING
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook
+from airflow.providers.amazon.aws.utils.neptune import NeptuneDbType
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class NeptuneStartDbOperator(BaseOperator):
+    """
+    Starts a Neptune DB cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:NeptuneStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: 
"cluster")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: 
True)
+
+    Note: In boto3 supports starting db operator only for cluster and not for 
instance db_type.
+        So, default is maintained as Cluster, however it can be extended once 
instance db_type is available,
+        similar to RDS database implementation
+    """
+
+    template_fields = ("db_identifier", "db_type")
+    STATES_FOR_STARTING = ["available", "starting"]

Review Comment:
   Can you verify these are lowercase?  I'm pretty sure most other AWS services 
use all caps on their status names.



##########
tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py:
##########
@@ -81,6 +81,9 @@
         # Opting to use the same ARN for the cluster and the pod here,
         # but a different ARN could be configured and passed if desired.
         fargate_pod_execution_role_arn=fargate_pod_role_arn,
+        deferrable=True,
+        waiter_delay=30,
+        wait_for_completion=399,

Review Comment:
   This doesn't appear related to the rest of the PR.
   
   Also: System tests don't currently support deferrable operators, and 
wait_for_completion is a bool.



##########
airflow/providers/amazon/aws/hooks/neptune.py:
##########
@@ -0,0 +1,113 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Interact with AWS Neptune."""
+from __future__ import annotations
+
+import time
+from typing import Callable
+
+from airflow.exceptions import AirflowException, AirflowNotFoundException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class NeptuneHook(AwsBaseHook):
+    """
+    Interact with AWS Neptune using proper client from the boto3 library.
+
+    Hook attribute `conn` has all methods that listed in documentation
+
+    .. seealso::
+        - 
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/neptune.html
+        - https://docs.aws.amazon.com/neptune/index.html
+
+    Additional arguments (such as ``aws_conn_id`` or ``region_name``) may be 
specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    """
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = "neptune"
+        super().__init__(*args, **kwargs)
+
+    def get_db_cluster_state(self, db_cluster_id: str) -> str:
+        """
+        Get the current state of a DB cluster.
+
+        :param db_cluster_id: The ID of the target DB cluster.
+        :return: Returns the status of the DB cluster as a string (eg. 
"available")
+        :rtype: str
+        :raises AirflowNotFoundException: If the DB cluster does not exist.
+        """
+        try:
+            response = 
self.conn.describe_db_clusters(DBClusterIdentifier=db_cluster_id)
+        except self.conn.exceptions.ClientError as e:
+            if e.response["Error"]["Code"] == "DBClusterNotFoundFault":
+                raise AirflowNotFoundException(e)
+            raise e
+        return response["DBClusters"][0]["Status"].lower()
+
+    def wait_for_db_cluster_state(
+        self, db_cluster_id: str, target_state: str, check_interval: int = 30, 
max_attempts: int = 40
+    ) -> None:
+        """
+        Polls until the target state is reached.
+        An error is raised after a max number of attempts.

Review Comment:
   I know this one is very nitpicky, but we're working on enabling D205 style 
checking which will enforce it soon.
   
   ```suggestion
           Polls until the target state is reached.
           
           An error is raised after a max number of attempts.
   ```



##########
airflow/providers/amazon/aws/hooks/neptune.py:
##########
@@ -0,0 +1,113 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Interact with AWS Neptune."""
+from __future__ import annotations
+
+import time
+from typing import Callable
+
+from airflow.exceptions import AirflowException, AirflowNotFoundException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class NeptuneHook(AwsBaseHook):
+    """
+    Interact with AWS Neptune using proper client from the boto3 library.
+
+    Hook attribute `conn` has all methods that listed in documentation
+
+    .. seealso::
+        - 
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/neptune.html
+        - https://docs.aws.amazon.com/neptune/index.html
+
+    Additional arguments (such as ``aws_conn_id`` or ``region_name``) may be 
specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    """
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = "neptune"
+        super().__init__(*args, **kwargs)
+
+    def get_db_cluster_state(self, db_cluster_id: str) -> str:
+        """
+        Get the current state of a DB cluster.
+
+        :param db_cluster_id: The ID of the target DB cluster.
+        :return: Returns the status of the DB cluster as a string (eg. 
"available")
+        :rtype: str
+        :raises AirflowNotFoundException: If the DB cluster does not exist.
+        """
+        try:
+            response = 
self.conn.describe_db_clusters(DBClusterIdentifier=db_cluster_id)
+        except self.conn.exceptions.ClientError as e:
+            if e.response["Error"]["Code"] == "DBClusterNotFoundFault":
+                raise AirflowNotFoundException(e)
+            raise e
+        return response["DBClusters"][0]["Status"].lower()
+
+    def wait_for_db_cluster_state(

Review Comment:
   It feels like this should be a Sensor, but either way it should likely be 
implemented as a [custom 
waiter](https://github.com/apache/airflow/tree/main/airflow/providers/amazon/aws/waiters)



##########
airflow/providers/amazon/aws/operators/neptune.py:
##########
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from functools import cached_property
+from typing import TYPE_CHECKING
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook
+from airflow.providers.amazon.aws.utils.neptune import NeptuneDbType
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class NeptuneStartDbOperator(BaseOperator):
+    """
+    Starts a Neptune DB cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:NeptuneStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: 
"cluster")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: 
True)
+
+    Note: In boto3 supports starting db operator only for cluster and not for 
instance db_type.
+        So, default is maintained as Cluster, however it can be extended once 
instance db_type is available,
+        similar to RDS database implementation
+    """
+
+    template_fields = ("db_identifier", "db_type")
+    STATES_FOR_STARTING = ["available", "starting"]
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        region_name: str | None = None,
+        db_type: NeptuneDbType | str = NeptuneDbType.CLUSTER,
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.db_identifier = db_identifier
+        self.region_name = region_name
+        self.db_type = db_type
+        self.aws_conn_id = aws_conn_id
+        self.wait_for_completion = wait_for_completion
+
+    @cached_property
+    def hook(self) -> NeptuneHook:
+        """Create and return a NeptuneHook."""
+        return NeptuneHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
+
+    def execute(self, context: Context) -> str:
+        self.db_type = NeptuneDbType(self.db_type)
+        start_db_response = None
+        if (
+            self.hook.get_db_cluster_state(self.db_identifier)
+            not in NeptuneStartDbOperator.STATES_FOR_STARTING
+        ):
+            self._start_db()
+
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info("Starting DB %s '%s'", self.db_type.value, 
self.db_identifier)
+        self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+
+    def _wait_until_db_available(self):
+        self.log.info("Waiting for DB %s to reach 'available' state", 
self.db_type.value)
+        self.hook.wait_for_db_cluster_state(self.db_identifier, 
target_state="available")
+
+
+class NeptuneStopDbOperator(BaseOperator):
+    """
+    Stops a Neptune DB cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:NeptuneStopDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: 
"cluster")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: 
True)
+
+    Note: In boto3 supports starting db operator only for cluster and not for 
instance db_type.
+        So, default is maintained as Cluster, however it can be extended once 
instance db_type is available,
+        similar to RDS database implementation
+    """
+
+    template_fields = ("db_identifier", "db_type")
+    STATES_FOR_STOPPING = ["stopped", "stopping"]
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        region_name: str | None = None,
+        db_type: NeptuneDbType | str = NeptuneDbType.INSTANCE,
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.db_identifier = db_identifier
+        self.region_name = region_name
+        self.db_type = db_type
+        self.aws_conn_id = aws_conn_id
+        self.wait_for_completion = wait_for_completion
+
+    @cached_property
+    def hook(self) -> NeptuneHook:
+        """Create and return a NeptuneHook."""
+        return NeptuneHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
+
+    def execute(self, context: Context) -> str:
+        self.db_type = NeptuneDbType(self.db_type)
+        stop_db_response = None
+        if (
+            self.hook.get_db_cluster_state(self.db_identifier)
+            not in NeptuneStopDbOperator.STATES_FOR_STOPPING
+        ):
+            stop_db_response = self._stop_db()
+        if self.wait_for_completion:
+            self._wait_until_db_stopped()
+        return json.dumps(stop_db_response, default=str)
+
+    def _stop_db(self):
+        self.log.info("Stopping DB %s '%s'", self.db_type.value, 
self.db_identifier)
+        response = 
self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_stopped(self):
+        self.log.info("Waiting for DB %s to reach 'stopped' state", 
self.db_type.value)
+        self.hook.wait_for_db_cluster_state(self.db_identifier, 
target_state="stopped")
+
+
+__all__ = ["NeptuneStartDbOperator", "NeptuneStopDbOperator"]

Review Comment:
   Pretty sure you can drop this?



##########
tests/system/providers/amazon/aws/example_neptune_cluster.py:
##########
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.neptune import 
NeptuneStartDbOperator, NeptuneStopDbOperator
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
+
+sys_test_context_task = SystemTestContextBuilder().build()
+
+DAG_ID = "example_neptune_cluster"
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule="@once",
+    start_date=datetime(2023, 1, 1),
+    tags=["example"],
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+

Review Comment:
   Unless there is a good reason otherwise, system tests should be 
self-contained so the Db should be created and destroyed within the test.  I 
know there aren't operators for that yet, but you can use the boto API in a 
@task to do it, as seen in other tests.  Something like:
   
   ```
   @task
   def create_db():
       client = NeptuneHook().conn
       
       client.create_db_cluster(foo)
       client.create_db_instance(bar)
   ```
   
   Then a teardown task at the end to undo it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to