Raj Prakash Kante created COMMONSSITE-163:
---------------------------------------------
Summary: Xcom not returned while using
'BeamRunJavaPipelineOperator'.
Key: COMMONSSITE-163
URL: https://issues.apache.org/jira/browse/COMMONSSITE-163
Project: Apache Commons All
Issue Type: Bug
Reporter: Raj Prakash Kante
I was using 'BeamRunJavaPipelineOperator' to run a java jar to ingest data
using dataflow from google cloud storage to Bigquery using a airflow DAG.The
dataflow job is submitted successfully but I want to wait until the dataflow
job runs successfully in the background and then move on to the next task.I am
thinking to tackle this using 'DataflowJobStatusSensor' which checks the status
of the job in the background. This requires the job ID we want to check which
is supposed to be returned as a Xcom by the 'BeamRunJavaPipelineOperator' but
it does not return the desired Xcom.
start_java_pipeline = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline",
runner='dataflow',
jar="<path-to-java-jar>",
pipeline_options={'airflowBucket': '<bucket-path>',
'jobName': '<job-name>',
'inputfileBucket': '<input-file-path>',
'maxNumWorkers': '10',
'targetTableProject': '<Project-name>',
'datasetName': '<dataset-name>',
'serviceAccount': '<service-account>',
'runConfig': '<path-config-files>',
'project': '<project-name>',
'workerMachineType': 'n1-standard-2',
'region': '<region>',
'subnetwork': "<subnetwork>",
'usePublicIps': 'false',
'stagingLocation': '<Staging-loaction>',
'tempLocation': '<temp-location>'
},
job_class='<class-name-in-jar>',
do_xcom_push=True,
dag=dag)
wait_for_done = DataflowJobStatusSensor(
task_id="wait-for-java-dataflow",
job_id="\{{task_instance.xcom_pull('Get_job_id')}}",
expected_statuses=\{DataflowJobStatus.JOB_STATE_DONE},
project_id="xxx-xx-xxx",
gcp_conn_id='google_cloud_default',
location='us-central1',
)
start_java_pipeline >> wait_for_done
Using "DataFlowJavaOperator" I am able to get the jobid to the xcom and fetch
the same using "DataflowJobStatusSensor" without any issues.But this is a
deprecated operator.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)