Repository: incubator-airflow Updated Branches: refs/heads/master 147472b99 -> a2bb2d70a
[AIRFLOW-1996] Update DataflowHook waitfordone for Streaming type job[] AIRFLOW-1996 Update DataflowHook waitfordone for Streaming type job fix flake8 Closes #2938 from ivanwirawan/AIRFLOW-1996 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a2bb2d70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a2bb2d70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a2bb2d70 Branch: refs/heads/master Commit: a2bb2d70afdaf6a016c60d62a1ddea6d4a442c61 Parents: 147472b Author: Ivan Wirawan <[email protected]> Authored: Fri Jan 12 15:55:14 2018 +0100 Committer: Fokko Driesprong <[email protected]> Committed: Fri Jan 12 15:55:14 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/gcp_dataflow_hook.py | 3 +++ 1 file changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a2bb2d70/airflow/contrib/hooks/gcp_dataflow_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py index f9970d9..d60b498 100644 --- a/airflow/contrib/hooks/gcp_dataflow_hook.py +++ b/airflow/contrib/hooks/gcp_dataflow_hook.py @@ -65,6 +65,9 @@ class _DataflowJob(LoggingMixin): if 'currentState' in self._job: if 'JOB_STATE_DONE' == self._job['currentState']: return True + elif 'JOB_STATE_RUNNING' == self._job['currentState'] and \ + 'JOB_TYPE_STREAMING' == self._job['type']: + return True elif 'JOB_STATE_FAILED' == self._job['currentState']: raise Exception("Google Cloud Dataflow job {} has failed.".format( self._job['name']))
