super017 opened a new issue #14713:
URL: https://github.com/apache/airflow/issues/14713
The version of airflow I deployed is 1.10.14.A master and three workers.I
want to complete several Workers to complete a DAG.
my code :
from datetime import timedelta,datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
import time
import math
default_args = {
'owner':'airflow',
'retries':3,
'depends_on_past':False,
'start_date': days_ago(2),
'provide_context':True,
}
dag = DAG(
'xhcyeh',
default_args=default_args,
schedule_interval=timedelta(minutes=5)
)
def function1(**kwargs):
sum =2
#x = kwargs['dag_run'].conf['x']
#x = kwargs.get("dag_run").conf.get("x")
x = 5
for i in range(x):
sum +=i
print(sum)
f = open('/home/lis/airflow/dags/x.txt','w')
f.write(str(sum))
f.close()
kwargs['task_instance'].xcom_push(key='sea1',value=str(sum))
return sum
def function2(**kwargs):
sum = kwargs['task_instance'].xcom_pull(key='sea1',task_ids='function1')
f = open('/home/ls/airflow/dags/y.txt','w')
f.write(str(math.pow(int(sum),2)))
f.close()
return math.pow(int(sum),2)
run_this = PythonOperator(
task_id='function1',
python_callable=function1,
queue='cdh6-2-node93',
dag=dag,
)
t2 = PythonOperator(
task_id='function2',
python_callable= function2,
queue='cdh6-2-node94',
dag=dag,
)
run_this >> t2
I want to test two workers working together to complete a simple
calculation.What happens is that the first function is finished and the second
function stays in the scheduled state.What happens is that the first function
is finished and the second function stays in the scheduled state.The flower log
shows celery failed. There is also no record of parameter passing in the XCOM
table of the background database.Is there something wrong with my code?What
changes should I make
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]