This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new a7e144b Google Dataflow Hook to handle no Job Type (#14914)
a7e144b is described below
commit a7e144bec855f6ccf0fa5ae8447894195ffe170f
Author: Tobiasz Kędzierski <[email protected]>
AuthorDate: Tue Mar 23 19:48:42 2021 +0100
Google Dataflow Hook to handle no Job Type (#14914)
Co-authored-by: Tomek Urbaszek <[email protected]>
---
airflow/providers/google/cloud/hooks/dataflow.py | 2 +-
.../providers/google/cloud/hooks/test_dataflow.py | 28 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py
b/airflow/providers/google/cloud/hooks/dataflow.py
index f0986e6..7c53507 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -404,7 +404,7 @@ class _DataflowJobsController(LoggingMixin):
:raise: Exception
"""
if self._wait_until_finished is None:
- wait_for_running = job['type'] ==
DataflowJobType.JOB_TYPE_STREAMING
+ wait_for_running = job.get('type') ==
DataflowJobType.JOB_TYPE_STREAMING
else:
wait_for_running = not self._wait_until_finished
diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py
b/tests/providers/google/cloud/hooks/test_dataflow.py
index 7ceef1f..03d5ce3 100644
--- a/tests/providers/google/cloud/hooks/test_dataflow.py
+++ b/tests/providers/google/cloud/hooks/test_dataflow.py
@@ -1416,6 +1416,34 @@ class TestDataflowJob(unittest.TestCase):
# fmt: off
@parameterized.expand([
+ # RUNNING
+ (DataflowJobStatus.JOB_STATE_RUNNING, None, False),
+ (DataflowJobStatus.JOB_STATE_RUNNING, True, False),
+ (DataflowJobStatus.JOB_STATE_RUNNING, False, True),
+ # AWAITING STATE
+ (DataflowJobStatus.JOB_STATE_PENDING, None, False),
+ (DataflowJobStatus.JOB_STATE_PENDING, True, False),
+ (DataflowJobStatus.JOB_STATE_PENDING, False, True),
+ ])
+ # fmt: on
+ def test_check_dataflow_job_state_without_job_type(self, job_state,
wait_until_finished, expected_result):
+ job = {"id": "id-2", "name": "name-2", "currentState": job_state}
+ dataflow_job = _DataflowJobsController(
+ dataflow=self.mock_dataflow,
+ project_number=TEST_PROJECT,
+ name="name-",
+ location=TEST_LOCATION,
+ poll_sleep=0,
+ job_id=None,
+ num_retries=20,
+ multiple_jobs=True,
+ wait_until_finished=wait_until_finished,
+ )
+ result = dataflow_job._check_dataflow_job_state(job)
+ assert result == expected_result
+
+ # fmt: off
+ @parameterized.expand([
(DataflowJobType.JOB_TYPE_BATCH, DataflowJobStatus.JOB_STATE_FAILED,
"Google Cloud Dataflow job name-2 has failed\\."),
(DataflowJobType.JOB_TYPE_STREAMING,
DataflowJobStatus.JOB_STATE_FAILED,