aarshayj commented on pull request #13478:
URL: https://github.com/apache/airflow/pull/13478#issuecomment-787398700
@TobKed
After following the code flow, I think I see what the major source of the
issue is:
- `DataflowStartFlexTemplateOperator`'s init has `wait_until_finished:
Optional[bool] = None,` as an argument
```
@apply_defaults
def __init__(
self,
body: Dict,
location: str,
project_id: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
drain_pipeline: bool = False,
cancel_timeout: Optional[int] = 10 * 60,
wait_until_finished: Optional[bool] = None,
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.body = body
self.location = location
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.drain_pipeline = drain_pipeline
self.cancel_timeout = cancel_timeout
self.wait_until_finished = wait_until_finished
self.job_id = None
self.hook: Optional[DataflowHook] = None
```
- The execute method of `DataflowStartFlexTemplateOperator` initializes
`DataflowHook`
```
def execute(self, context):
self.hook = DataflowHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
drain_pipeline=self.drain_pipeline,
cancel_timeout=self.cancel_timeout,
wait_until_finished=self.wait_until_finished,
)
```
- `DataflowHook`'s init sets the same property:
```
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
poll_sleep: int = 10,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
drain_pipeline: bool = False,
cancel_timeout: Optional[int] = 5 * 60,
wait_until_finished: Optional[bool] = None,
) -> None:
self.poll_sleep = poll_sleep
self.drain_pipeline = drain_pipeline
self.cancel_timeout = cancel_timeout
self.wait_until_finished = wait_until_finished
super().__init__(
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
```
- Both `def start_template_dataflow(..)` and `def start_flex_template(..)`
initialize `_DataflowJobsController` as
```
jobs_controller = _DataflowJobsController(
dataflow=self.get_conn(),
project_number=project_id,
job_id=job_id,
location=location,
poll_sleep=self.poll_sleep,
num_retries=self.num_retries,
cancel_timeout=self.cancel_timeout,
)
```
The issue is that the value of `wait_until_finished` is not being passed to
the `_DataflowJobsController`. That's why this if block inside inside the
controller code is True as the value is not being passed down.
```
if self._wait_until_finished is None:
wait_for_running = job.get('type', '') ==
DataflowJobType.JOB_TYPE_STREAMING
```
If we fix just this, I think we should be able to solve the problem by
passing the right value for `wait_until_finished`.
Does this make sense?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]