aarshayj commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-787398700


   @TobKed 
   After following the code flow, I think I see what the major source of the 
issue is:
   - `DataflowStartFlexTemplateOperator`'s init has `wait_until_finished: 
Optional[bool] = None,` as an argument
   ```
   @apply_defaults
       def __init__(
           self,
           body: Dict,
           location: str,
           project_id: Optional[str] = None,
           gcp_conn_id: str = "google_cloud_default",
           delegate_to: Optional[str] = None,
           drain_pipeline: bool = False,
           cancel_timeout: Optional[int] = 10 * 60,
           wait_until_finished: Optional[bool] = None,
           *args,
           **kwargs,
       ) -> None:
           super().__init__(*args, **kwargs)
           self.body = body
           self.location = location
           self.project_id = project_id
           self.gcp_conn_id = gcp_conn_id
           self.delegate_to = delegate_to
           self.drain_pipeline = drain_pipeline
           self.cancel_timeout = cancel_timeout
           self.wait_until_finished = wait_until_finished
           self.job_id = None
           self.hook: Optional[DataflowHook] = None
   ```
   
   - The execute method of `DataflowStartFlexTemplateOperator` initializes 
`DataflowHook` 
   ```
       def execute(self, context):
           self.hook = DataflowHook(
               gcp_conn_id=self.gcp_conn_id,
               delegate_to=self.delegate_to,
               drain_pipeline=self.drain_pipeline,
               cancel_timeout=self.cancel_timeout,
               wait_until_finished=self.wait_until_finished,
           )
   ```
   - `DataflowHook`'s init sets the same property:
   ```
       def __init__(
           self,
           gcp_conn_id: str = "google_cloud_default",
           delegate_to: Optional[str] = None,
           poll_sleep: int = 10,
           impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
           drain_pipeline: bool = False,
           cancel_timeout: Optional[int] = 5 * 60,
           wait_until_finished: Optional[bool] = None,
       ) -> None:
           self.poll_sleep = poll_sleep
           self.drain_pipeline = drain_pipeline
           self.cancel_timeout = cancel_timeout
           self.wait_until_finished = wait_until_finished
           super().__init__(
               gcp_conn_id=gcp_conn_id,
               delegate_to=delegate_to,
               impersonation_chain=impersonation_chain,
           )
   ```
   - Both `def start_template_dataflow(..)` and `def start_flex_template(..)` 
initialize `_DataflowJobsController` as
   ```
   jobs_controller = _DataflowJobsController(
               dataflow=self.get_conn(),
               project_number=project_id,
               job_id=job_id,
               location=location,
               poll_sleep=self.poll_sleep,
               num_retries=self.num_retries,
               cancel_timeout=self.cancel_timeout,
           )
   ```
   The issue is that the value of `wait_until_finished` is not being passed to 
the `_DataflowJobsController`. That's why this if block inside inside the 
controller code is True as the value is not being passed down.
   ```
   if self._wait_until_finished is None:
         wait_for_running = job.get('type', '') == 
DataflowJobType.JOB_TYPE_STREAMING
   ```
   
   If we fix just this, I think we should be able to solve the problem by 
passing the right value for `wait_until_finished`.
   
   Does this make sense?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to