Taragolis commented on code in PR #38287: URL: https://github.com/apache/airflow/pull/38287#discussion_r1530439992
########## airflow/providers/amazon/aws/operators/neptune.py: ########## @@ -81,17 +85,86 @@ def __init__( self.delay = waiter_delay self.max_attempts = waiter_max_attempts - def execute(self, context: Context) -> dict[str, str]: + def execute(self, context: Context, event: dict[str, Any] | None = None, **kwargs) -> dict[str, str]: self.log.info("Starting Neptune cluster: %s", self.cluster_id) + if event: + # returning from a previous defer, need to restore properties + self.cluster_id = kwargs.get("cluster_id", self.cluster_id) + self.deferrable = kwargs.get("defer", self.deferrable) + self.delay = kwargs.get("waiter_delay", self.delay) + self.max_attempts = kwargs.get("waiter_max_attempts", self.max_attempts) + self.wait_for_completion = kwargs.get("wait_for_completion", self.wait_for_completion) + self.aws_conn_id = kwargs.get("aws_conn_id", self.aws_conn_id) + self.log.info("Restored properties from deferral") + # Check to make sure the cluster is not already available. status = self.hook.get_cluster_status(self.cluster_id) if status.lower() in NeptuneHook.AVAILABLE_STATES: self.log.info("Neptune cluster %s is already available.", self.cluster_id) return {"db_cluster_id": self.cluster_id} - - resp = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.cluster_id) - status = resp.get("DBClusters", {}).get("Status", "Unknown") + elif status.lower() in NeptuneHook.ERROR_STATES: + # some states will not allow you to start the cluster + self.log.error( + "Neptune cluster %s is in error state %s and cannot be started", self.cluster_id, status + ) + raise AirflowException(f"Neptune cluster {self.cluster_id} is in error state {status}") + + """ + A cluster and its instances must be in a valid state to send the start request. + This loop covers the case where the cluster is not available and also the case where + the cluster is available, but one or more of the instances are in an invalid state. + If either are in an invalid state, wait for the availability and retry. + Let the waiters handle retries and detecting the error states. + """ + try: + self.hook.conn.start_db_cluster(DBClusterIdentifier=self.cluster_id) + except ClientError as ex: + code = ex.response["Error"]["Code"] + self.log.warning("Received client error when attempting to start the cluster: %s", code) + + if code in ["InvalidDBInstanceStateFault", "InvalidClusterStateFault"]: + if self.deferrable: + # save the arguments to restore after defer + defer_args = { + "cluster_id": self.cluster_id, + "defer": self.deferrable, + "wait_for_completion": self.wait_for_completion, + "waiter_delay": self.delay, + "waiter_max_attempts": self.max_attempts, + "aws_conn_id": self.aws_conn_id, + } + if code == "InvalidDBInstanceStateFault": + # wait for all instances to become available + self.log.info("Deferring for instances to become available: %s", self.cluster_id) + self.defer( + trigger=NeptuneClusterInstancesAvailableTrigger( + aws_conn_id=self.aws_conn_id, + db_cluster_id=self.cluster_id, Review Comment: Missing common parameters ########## airflow/providers/amazon/aws/operators/neptune.py: ########## @@ -81,17 +85,86 @@ def __init__( self.delay = waiter_delay self.max_attempts = waiter_max_attempts - def execute(self, context: Context) -> dict[str, str]: + def execute(self, context: Context, event: dict[str, Any] | None = None, **kwargs) -> dict[str, str]: Review Comment: `execute` method usually have strict rules parameters, and it expected to have only context. And it is not common defer to `execute` method I'm not sure is any operators use it Interesting why mypy do not complain about this changes ########## airflow/providers/amazon/aws/operators/neptune.py: ########## @@ -81,17 +85,86 @@ def __init__( self.delay = waiter_delay self.max_attempts = waiter_max_attempts - def execute(self, context: Context) -> dict[str, str]: + def execute(self, context: Context, event: dict[str, Any] | None = None, **kwargs) -> dict[str, str]: self.log.info("Starting Neptune cluster: %s", self.cluster_id) + if event: + # returning from a previous defer, need to restore properties + self.cluster_id = kwargs.get("cluster_id", self.cluster_id) + self.deferrable = kwargs.get("defer", self.deferrable) + self.delay = kwargs.get("waiter_delay", self.delay) + self.max_attempts = kwargs.get("waiter_max_attempts", self.max_attempts) + self.wait_for_completion = kwargs.get("wait_for_completion", self.wait_for_completion) + self.aws_conn_id = kwargs.get("aws_conn_id", self.aws_conn_id) + self.log.info("Restored properties from deferral") + # Check to make sure the cluster is not already available. status = self.hook.get_cluster_status(self.cluster_id) if status.lower() in NeptuneHook.AVAILABLE_STATES: self.log.info("Neptune cluster %s is already available.", self.cluster_id) return {"db_cluster_id": self.cluster_id} - - resp = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.cluster_id) - status = resp.get("DBClusters", {}).get("Status", "Unknown") + elif status.lower() in NeptuneHook.ERROR_STATES: + # some states will not allow you to start the cluster + self.log.error( + "Neptune cluster %s is in error state %s and cannot be started", self.cluster_id, status + ) + raise AirflowException(f"Neptune cluster {self.cluster_id} is in error state {status}") + + """ + A cluster and its instances must be in a valid state to send the start request. + This loop covers the case where the cluster is not available and also the case where + the cluster is available, but one or more of the instances are in an invalid state. + If either are in an invalid state, wait for the availability and retry. + Let the waiters handle retries and detecting the error states. + """ + try: + self.hook.conn.start_db_cluster(DBClusterIdentifier=self.cluster_id) + except ClientError as ex: + code = ex.response["Error"]["Code"] + self.log.warning("Received client error when attempting to start the cluster: %s", code) + + if code in ["InvalidDBInstanceStateFault", "InvalidClusterStateFault"]: + if self.deferrable: + # save the arguments to restore after defer + defer_args = { + "cluster_id": self.cluster_id, + "defer": self.deferrable, + "wait_for_completion": self.wait_for_completion, + "waiter_delay": self.delay, + "waiter_max_attempts": self.max_attempts, + "aws_conn_id": self.aws_conn_id, + } + if code == "InvalidDBInstanceStateFault": + # wait for all instances to become available + self.log.info("Deferring for instances to become available: %s", self.cluster_id) + self.defer( + trigger=NeptuneClusterInstancesAvailableTrigger( + aws_conn_id=self.aws_conn_id, + db_cluster_id=self.cluster_id, + ), + method_name="execute", + kwargs=defer_args, + ) + elif code == "InvalidClusterStateFault": + self.log.info("Deferring for cluster to become available: %s", self.cluster_id) + self.defer( + trigger=NeptuneClusterAvailableTrigger( + aws_conn_id=self.aws_conn_id, Review Comment: Missing common parameters: `region_name`, `verify`, `botocore_config` ########## airflow/providers/amazon/aws/operators/neptune.py: ########## @@ -81,17 +85,86 @@ def __init__( self.delay = waiter_delay self.max_attempts = waiter_max_attempts - def execute(self, context: Context) -> dict[str, str]: + def execute(self, context: Context, event: dict[str, Any] | None = None, **kwargs) -> dict[str, str]: self.log.info("Starting Neptune cluster: %s", self.cluster_id) + if event: + # returning from a previous defer, need to restore properties + self.cluster_id = kwargs.get("cluster_id", self.cluster_id) + self.deferrable = kwargs.get("defer", self.deferrable) + self.delay = kwargs.get("waiter_delay", self.delay) + self.max_attempts = kwargs.get("waiter_max_attempts", self.max_attempts) + self.wait_for_completion = kwargs.get("wait_for_completion", self.wait_for_completion) + self.aws_conn_id = kwargs.get("aws_conn_id", self.aws_conn_id) + self.log.info("Restored properties from deferral") + # Check to make sure the cluster is not already available. status = self.hook.get_cluster_status(self.cluster_id) if status.lower() in NeptuneHook.AVAILABLE_STATES: self.log.info("Neptune cluster %s is already available.", self.cluster_id) return {"db_cluster_id": self.cluster_id} - - resp = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.cluster_id) - status = resp.get("DBClusters", {}).get("Status", "Unknown") + elif status.lower() in NeptuneHook.ERROR_STATES: + # some states will not allow you to start the cluster + self.log.error( + "Neptune cluster %s is in error state %s and cannot be started", self.cluster_id, status + ) + raise AirflowException(f"Neptune cluster {self.cluster_id} is in error state {status}") + + """ + A cluster and its instances must be in a valid state to send the start request. + This loop covers the case where the cluster is not available and also the case where + the cluster is available, but one or more of the instances are in an invalid state. + If either are in an invalid state, wait for the availability and retry. + Let the waiters handle retries and detecting the error states. + """ + try: + self.hook.conn.start_db_cluster(DBClusterIdentifier=self.cluster_id) + except ClientError as ex: + code = ex.response["Error"]["Code"] + self.log.warning("Received client error when attempting to start the cluster: %s", code) + + if code in ["InvalidDBInstanceStateFault", "InvalidClusterStateFault"]: + if self.deferrable: + # save the arguments to restore after defer + defer_args = { + "cluster_id": self.cluster_id, + "defer": self.deferrable, + "wait_for_completion": self.wait_for_completion, + "waiter_delay": self.delay, + "waiter_max_attempts": self.max_attempts, + "aws_conn_id": self.aws_conn_id, + } + if code == "InvalidDBInstanceStateFault": + # wait for all instances to become available + self.log.info("Deferring for instances to become available: %s", self.cluster_id) + self.defer( + trigger=NeptuneClusterInstancesAvailableTrigger( + aws_conn_id=self.aws_conn_id, + db_cluster_id=self.cluster_id, + ), + method_name="execute", + kwargs=defer_args, + ) + elif code == "InvalidClusterStateFault": + self.log.info("Deferring for cluster to become available: %s", self.cluster_id) + self.defer( + trigger=NeptuneClusterAvailableTrigger( + aws_conn_id=self.aws_conn_id, + db_cluster_id=self.cluster_id, + ), + method_name="execute", + kwargs=defer_args, + ) + + else: + self.log.info("Need to wait for cluster to become available: %s", self.cluster_id) + self.hook.wait_for_cluster_availability(self.cluster_id) + # make sure individual instances are available too. + self.log.info("Need to wait for instances to become available: %s", self.cluster_id) + self.hook.wait_for_cluster_instance_availability(cluster_id=self.cluster_id) + else: + # re raise for any other type of client error + raise ex Review Comment: ```suggestion raise ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org