jaketf opened a new issue #8673:
URL: https://github.com/apache/airflow/issues/8673


   (re-opening of #8672 with appropriate template)
   CC: @turbaszek 
   **Apache Airflow version**: master
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   **Environment**: Composer (this is not unique to composer it's a code logic 
issue)
   
   - **Cloud provider or hardware configuration**: gcp
   - **OS** (e.g. from /etc/os-release):
   - **Kernel** (e.g. `uname -a`):
   - **Install tools**:
   - **Others**:
   
   **What happened**:
   
[Here](https://github.com/apache/airflow/blob/4421f011eeec2d1022a39933e27f530fb9f9c1b1/airflow/providers/google/cloud/hooks/datafusion.py#L392)
 the `start_pipeline` method of data fusion hook will succeed if they get a 200 
from the CDAP API.
   This is a misleading success signal as this indicates at best that this 
pipeline entered the PENDING state. However start pipeline should not succeed 
until the pipline has reached the RUNNING state.
   Note the Happy path is PENDING > STARTING> RUNNING 
([ProgramStatus](https://github.com/cdapio/cdap/blob/1d62163faaecb5b888f4bccd0fcf4a8d27bbd549/cdap-proto/src/main/java/io/cdap/cdap/proto/ProgramRunStatus.java))
 Many CDAP pipelines using Dataproc Provisioner spend a significant amount of 
time in the STARTING state because they  also have tick through the various 
[ProgramRunClusterStatus](https://github.com/cdapio/cdap/blob/1d62163faaecb5b888f4bccd0fcf4a8d27bbd549/cdap-proto/src/main/java/io/cdap/cdap/proto/ProgramRunClusterStatus.java)
 for provisioning the dataproc cluster.
   
   **What you expected to happen**:
   start_pipeline and the associated operator should not succeed until pipeline 
is started (enters RUNNING state). 
   
   **How to reproduce it**:
   This is a code logic issue and will be reproduced by any use of this method.
   If you want to demonstrate why this is problematic.
   - Run a dag that runs 
[CloudDataFusionStartPipelineOperator](https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/operators/datafusion.py#L607)
   - Do any of the following: 
       - Hop over to Data Fusion UI and manually stop the pipeline before it 
enters the running state
       - Manually delete the dataproc cluster before it finishes provisioning
       - Use a Compute Profile that tries to provision an illegal dataproc 
cluster (e.g. due to permissions issue where CDF SA doesn't have sufficient 
permission to create cluster in another project)  
   - observe that CloudDataFusionStartPipelineOperator task succeeds.
   
   **Anything else we need to know**:
   Unfortunately making the start call to CDAP does not return a run_id to poll 
for state.
   
   This hook could work around this by adding a special runtime arg e.g. 
`__faux_airflow_id__` which can be used to "look up" the real run id by this 
special runtime arg. the value of this runtime arg could be the dag_run_id or 
something. If using this workaround or CDAP API can return run id, then a more 
useful operator than start pipeline would be one that actually waits til the 
job reaches a success state (much like the existing dataflow and dataproc 
operators).
   
   Example in golang for terraform provider resource that manages a streaming 
pipeline.
   [Creating with faux 
id](https://github.com/GoogleCloudPlatform/terraform-provider-cdap/blob/master/cdap/resource_streaming_program_run.go#L108)
 
   
   And [looking up the real CDAP run id by faux 
id](https://github.com/GoogleCloudPlatform/terraform-provider-cdap/blob/master/cdap/resource_streaming_program_run.go#L216)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to