eladkal commented on code in PR #24554:
URL: https://github.com/apache/airflow/pull/24554#discussion_r908433592
##########
airflow/providers/amazon/aws/sensors/sqs.py:
##########
@@ -136,28 +142,47 @@ def poke(self, context: 'Context'):
messages = self.filter_messages(messages)
num_messages = len(messages)
self.log.info("There are %d messages left after filtering",
num_messages)
+ return messages
- if not num_messages:
- return False
+ def poke(self, context: 'Context'):
+ """
+ Check subscribed queue for messages and write them to xcom with the
``messages`` key.
- if not self.delete_message_on_reception:
- context['ti'].xcom_push(key='messages', value=messages)
- return True
+ :param context: the context object
+ :return: ``True`` if message is available or ``False``
+ """
+ sqs_conn = self.get_hook().get_conn()
- self.log.info("Deleting %d messages", num_messages)
+ message_batch: List[Any] = []
- entries = [
- {'Id': message['MessageId'], 'ReceiptHandle':
message['ReceiptHandle']} for message in messages
- ]
- response = sqs_conn.delete_message_batch(QueueUrl=self.sqs_queue,
Entries=entries)
+ # perform multiple SQS call to retrieve messages in series
+ for _ in range(self.num_batches):
+ messages = self.poll_sqs(sqs_conn=sqs_conn)
- if 'Successful' in response:
- context['ti'].xcom_push(key='messages', value=messages)
- return True
- else:
- raise AirflowException(
- 'Delete SQS Messages failed ' + str(response) + ' for messages
' + str(messages)
- )
+ if not len(messages):
+ continue
+
+ message_batch.extend(messages)
+
+ if self.delete_message_on_reception:
+
+ self.log.info("Deleting %d messages", len(messages))
+
+ entries = [
+ {'Id': message['MessageId'], 'ReceiptHandle':
message['ReceiptHandle']}
+ for message in messages
+ ]
+ response =
sqs_conn.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries)
Review Comment:
Just an observation, maybe we want to create delete function in the `SqsHook`
This can help to avoid cases of raising AirflowException on random HTTP
errors, we can use retry like we do in
https://github.com/apache/airflow/blob/602abe8394fafe7de54df7e73af56de848cdf617/airflow/providers/google/cloud/hooks/dataprep.py#L61-L62
##########
airflow/providers/amazon/aws/sensors/sqs.py:
##########
@@ -136,28 +142,47 @@ def poke(self, context: 'Context'):
messages = self.filter_messages(messages)
num_messages = len(messages)
self.log.info("There are %d messages left after filtering",
num_messages)
+ return messages
- if not num_messages:
- return False
+ def poke(self, context: 'Context'):
+ """
+ Check subscribed queue for messages and write them to xcom with the
``messages`` key.
- if not self.delete_message_on_reception:
- context['ti'].xcom_push(key='messages', value=messages)
- return True
+ :param context: the context object
+ :return: ``True`` if message is available or ``False``
+ """
+ sqs_conn = self.get_hook().get_conn()
- self.log.info("Deleting %d messages", num_messages)
+ message_batch: List[Any] = []
- entries = [
- {'Id': message['MessageId'], 'ReceiptHandle':
message['ReceiptHandle']} for message in messages
- ]
- response = sqs_conn.delete_message_batch(QueueUrl=self.sqs_queue,
Entries=entries)
+ # perform multiple SQS call to retrieve messages in series
+ for _ in range(self.num_batches):
+ messages = self.poll_sqs(sqs_conn=sqs_conn)
- if 'Successful' in response:
- context['ti'].xcom_push(key='messages', value=messages)
- return True
- else:
- raise AirflowException(
- 'Delete SQS Messages failed ' + str(response) + ' for messages
' + str(messages)
- )
+ if not len(messages):
+ continue
+
+ message_batch.extend(messages)
+
+ if self.delete_message_on_reception:
+
+ self.log.info("Deleting %d messages", len(messages))
+
+ entries = [
+ {'Id': message['MessageId'], 'ReceiptHandle':
message['ReceiptHandle']}
+ for message in messages
+ ]
+ response =
sqs_conn.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries)
Review Comment:
Just an observation, maybe we want to create delete function in `SqsHook`
This can help to avoid cases of raising AirflowException on random HTTP
errors, we can use retry like we do in
https://github.com/apache/airflow/blob/602abe8394fafe7de54df7e73af56de848cdf617/airflow/providers/google/cloud/hooks/dataprep.py#L61-L62
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]