prakash260 opened a new issue #10210:
URL: https://github.com/apache/airflow/issues/10210
I have created a DAG using SQSSensor to trigger whenever there is new
message in SQS.
It is not triggering automatically whenever there is new message.
Below is my code:
''''
from __future__ import print_function
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.aws_sqs_sensor import SQSSensor
default_args = {
'owner': 'Airflow',
'start_date': days_ago(1),
'provide_context': True,
}
def pull_from_xcom(**context):
val = context['ti'].xcom_pull(task_ids='sqs_get', key='messages')
print(val)
dag = DAG('sqs_test', default_args=default_args, schedule_interval='@daily')
t1 = SQSSensor(
dag=dag,
task_id='sqs_get',
sqs_queue='https://sqs.ap-southeast-2.amazonaws.com/accountid/test',
aws_conn_id='aws_default',
max_message=1,
wait_time_seconds = 1
)
t2 = PythonOperator(
task_id='xcom_pull',
python_callable=pull_from_xcom,
depends_on_past=False,
dag=dag)
t1 >> t2
''''
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]