potiuk commented on a change in pull request #17787:
URL: https://github.com/apache/airflow/pull/17787#discussion_r694691387
##########
File path: airflow/providers/google/cloud/example_dags/example_datafusion.py
##########
@@ -234,6 +254,14 @@
create_instance >> get_instance >> restart_instance >> update_instance >>
sleep
sleep >> create_pipeline >> list_pipelines >> start_pipeline >>
stop_pipeline >> delete_pipeline
+ (
Review comment:
There should be no repeated create/start/stop/delete here with
dependencies between them.
It is quite confusing what the example is trying to do and how it will
behave and how the DAG will look like.
This dag structure below seems to repeat the same dependencies again for
sleep --> create, stop --> delete. and this is totally needless . Plus I am not
sure what happens if the async and sync will work in parallel for the same
pipeline at the same time (which looks like is going to happen here).
Sounds like recipe for disaster trying to run both sync and async run for
the same pipeline at the same time (but maybe it will work who knows - i do not
know DataFusion that well:D ).
In order to avoid confusion about the dag definition, this should look more
like:
```
sleep >> create_pipeline >> list_pipelines >> start_pipeline >>
stop_pipeline >> start_pipeline_async, start_pipeline_sensor >> delete_pipeline
```
This shows much better the intentions , and allows to run first synchronous
version of the pipeline and then asynchronous one.
Or maybe even you should separate it out and prepare a separate
"example_datafusion_async.py" - as I am not sure if the same pipeline can be
run twice in succession (probably yes and if that's the case then single
example with both sync and async version is fine).
##########
File path: airflow/providers/google/cloud/example_dags/example_datafusion.py
##########
@@ -205,6 +206,25 @@
)
# [END howto_cloud_data_fusion_start_pipeline]
+ # [START howto_cloud_data_fusion_start_pipeline_async]
+ start_pipeline_async = CloudDataFusionStartPipelineOperator(
+ location=LOCATION,
+ pipeline_name=PIPELINE_NAME,
+ instance_name=INSTANCE_NAME,
+ asynchronous=True,
+ task_id="start_pipeline_async",
+ )
+
+ start_pipeline_sensor = DatafusionPipelineStateSensor(
+ task_id="pipeline_state_sensor",
+ pipeline_name=PIPELINE_NAME,
+ pipeline_id=start_pipeline_async.output,
+ expected_statuses=["COMPLETED"],
+ instance_name=INSTANCE_NAME,
+ location=LOCATION,
+ )
+ # [END howto_cloud_data_fusion_start_pipeline_async]
Review comment:
Those START/END markers are here for a reason - they are used as entries
in
https://github.com/apache/airflow/blob/main/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
to automatically extract pieces of the example dag to the "Howto"
documentation. The Howto documentation needs to get now a separate sync/async
sections.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]