baolsen commented on a change in pull request #16904:
URL: https://github.com/apache/airflow/pull/16904#discussion_r676397422
##########
File path: airflow/providers/amazon/aws/sensors/sqs.py
##########
@@ -48,13 +68,26 @@ def __init__(
aws_conn_id: str = 'aws_default',
max_messages: int = 5,
wait_time_seconds: int = 1,
+ visibility_timeout: Optional[int] = None,
+ message_filtering: Optional[str] = None,
+ message_filtering_match_values: Optional[Any] = None,
+ message_filtering_config: Optional[Any] = None,
**kwargs,
):
super().__init__(**kwargs)
self.sqs_queue = sqs_queue
self.aws_conn_id = aws_conn_id
self.max_messages = max_messages
self.wait_time_seconds = wait_time_seconds
+ self.visibility_timeout = visibility_timeout
+
+ self.message_filtering = message_filtering
+ if message_filtering_match_values is not None:
+ if not isinstance(message_filtering_match_values, list):
+ message_filtering_match_values =
[message_filtering_match_values]
+ self.message_filtering_match_values = message_filtering_match_values
Review comment:
Thanks for the feedback, I did not think of that. Fixed
##########
File path: airflow/providers/amazon/aws/sensors/sqs.py
##########
@@ -102,3 +152,39 @@ def get_hook(self) -> SQSHook:
self.hook = SQSHook(aws_conn_id=self.aws_conn_id)
return self.hook
+
+ def filter_messages(self, messages):
+ if self.message_filtering == 'literal':
+ return self.filter_messages_literal(messages)
+ if self.message_filtering == 'jsonpath':
+ return self.filter_messages_jsonpath(messages)
+ else:
+ raise NotImplementedError('Override this method to define custom
filters')
+
+ def filter_messages_literal(self, messages):
+ filtered_messages = []
+ if self.message_filtering_match_values is None:
+ raise Exception('message_filtering_match_values must be specified
for literal matching')
Review comment:
Fixed, thanks for the feedback
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]