Raj Prakash Kante created COMMONSSITE-163:
---------------------------------------------

             Summary: Xcom not returned while using 
'BeamRunJavaPipelineOperator'.
                 Key: COMMONSSITE-163
                 URL: https://issues.apache.org/jira/browse/COMMONSSITE-163
             Project: Apache Commons All
          Issue Type: Bug
            Reporter: Raj Prakash Kante


I was using 'BeamRunJavaPipelineOperator' to run a java jar to ingest data 
using dataflow from google cloud storage to Bigquery using a airflow DAG.The 
dataflow job is submitted successfully but I want to wait until the dataflow 
job runs successfully in the background and then move on to the next task.I am 
thinking to tackle this using 'DataflowJobStatusSensor' which checks the status 
of the job in the background. This requires the job ID we want to check which 
is supposed to be returned as a Xcom by the 'BeamRunJavaPipelineOperator' but 
it does not return the desired Xcom.

 

 

 

start_java_pipeline = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline",
runner='dataflow',
jar="<path-to-java-jar>",
pipeline_options={'airflowBucket': '<bucket-path>',
'jobName': '<job-name>',
'inputfileBucket': '<input-file-path>',
'maxNumWorkers': '10',
'targetTableProject': '<Project-name>',
'datasetName': '<dataset-name>',
'serviceAccount': '<service-account>',
'runConfig': '<path-config-files>',
'project': '<project-name>',
'workerMachineType': 'n1-standard-2',
'region': '<region>',
'subnetwork': "<subnetwork>",
'usePublicIps': 'false',
'stagingLocation': '<Staging-loaction>',
'tempLocation': '<temp-location>'
},
job_class='<class-name-in-jar>',
do_xcom_push=True,
dag=dag)

 

 

wait_for_done = DataflowJobStatusSensor(
task_id="wait-for-java-dataflow",
job_id="\{{task_instance.xcom_pull('Get_job_id')}}",
expected_statuses=\{DataflowJobStatus.JOB_STATE_DONE},
project_id="xxx-xx-xxx",
gcp_conn_id='google_cloud_default',
location='us-central1',
)

start_java_pipeline  >> wait_for_done

 


Using "DataFlowJavaOperator" I am able to get the jobid to the xcom and fetch 
the same using "DataflowJobStatusSensor" without any issues.But this is a 
deprecated operator.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to