ferruzzi commented on code in PR #39245:
URL: https://github.com/apache/airflow/pull/39245#discussion_r1581460944
##########
airflow/providers/amazon/aws/sensors/bedrock.py:
##########
@@ -68,7 +70,56 @@ def __init__(
super().__init__(**kwargs)
self.deferrable = deferrable
- def poke(self, context: Context) -> bool:
+ def poke(self, context: Context, **kwargs) -> bool:
+ state = self.get_state()
+ if state in self.FAILURE_STATES:
+ # TODO: remove this if block when min_airflow_version is set to
higher than 2.7.1
+ if self.soft_fail:
+ raise AirflowSkipException(self.FAILURE_MESSAGE)
+ raise AirflowException(self.FAILURE_MESSAGE)
+
+ return state not in self.INTERMEDIATE_STATES
+
+ @abc.abstractmethod
+ def get_state(self) -> str:
+ """Implement in subclasses."""
+
+
Review Comment:
@Taragolis - These two base sensors are identical other than the
aws_hook_class (BedrockHook vs BedrockAgentHook) and the hook name in the
parent class. I'm not really familiar with how you did the
`AwsBaseSensor[BedrockAgentHook]` thing, is it possible to combine these into
one BaseBedrockSensor? What would that look like?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]