jaketf commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r418995083
##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) ->
RT:
class DataflowJobStatus:
"""
Helper class with Dataflow job statuses.
+ Reference:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
"""
- JOB_STATE_DONE = "JOB_STATE_DONE"
+ JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
Review comment:
nit: for var names is `JOB_STATE_` prefix really necessary on all these?
IMO slightly more readable to drop it and this causes "stutter"
`DataflowJobStatus.JOB_STATE_xxx`.
A forward looking thought (though not backwards compatible so not immediate
suggestion) in python 3.8+ this set could be more concise with walrus operator
e.g.
```python
FAILED_END_STATES = {
(FAILED := "JOB_STATE_FAILED"),
(CANCELLED := "JOB_STATE_CANCELLED"),
(STOPPED := "JOB_STATE_STOPPED")
}
```
##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) ->
RT:
class DataflowJobStatus:
"""
Helper class with Dataflow job statuses.
+ Reference:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
"""
- JOB_STATE_DONE = "JOB_STATE_DONE"
+ JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+ JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+ JOB_STATE_DONE = "JOB_STATE_DONE"
JOB_STATE_FAILED = "JOB_STATE_FAILED"
JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+ JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+ JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+ JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
JOB_STATE_PENDING = "JOB_STATE_PENDING"
- FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
- SUCCEEDED_END_STATES = {JOB_STATE_DONE}
- END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+ JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+ JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+ FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED,
JOB_STATE_STOPPED}
+ SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED,
JOB_STATE_DRAINED}
+ TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
Review comment:
I think these are reasonable defaults, but I agree it would be nice to
let the users set as parameters.
I could even see DRAINING as a failed state (if airflow never expects a
human to make manual intervention)
I could see wanting to fail earlier on CANCELLING (rather than waiting til
CANCELLED)
##########
File path: tests/providers/google/cloud/hooks/test_dataflow.py
##########
@@ -692,10 +710,84 @@ def test_cancel_job(self, mock_get_conn, jobs_controller):
location=TEST_LOCATION,
name=TEST_JOB_NAME,
poll_sleep=10,
- project_number=TEST_PROJECT
+ project_number=TEST_PROJECT,
+ num_retries=5,
)
jobs_controller.cancel()
+ @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
+
@mock.patch(DATAFLOW_STRING.format('DataflowHook.provide_authorized_gcloud'))
+ @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
+ @mock.patch(DATAFLOW_STRING.format('subprocess.run'))
+ def test_start_sql_job_failed_to_run(
+ self, mock_run, mock_get_conn, mock_provide_authorized_gcloud,
mock_controller
+ ):
+ test_job = {'id': "TEST_JOB_ID"}
+ mock_controller.return_value.get_jobs.return_value = [test_job]
+ mock_run.return_value = mock.MagicMock(
+ stdout=f"{TEST_JOB_ID}\n".encode(),
+ stderr=f"{TEST_JOB_ID}\n".encode(),
+ returncode=0
+ )
+ on_new_job_id_callback = mock.MagicMock()
+ result = self.dataflow_hook.start_sql_job(
+ job_name=TEST_SQL_JOB_NAME,
+ query=TEST_SQL_QUERY,
+ options=TEST_SQL_OPTIONS,
+ location=TEST_LOCATION,
+ project_id=TEST_PROJECT,
+ on_new_job_id_callback=on_new_job_id_callback
+ )
+ mock_run.assert_called_once_with(
+ [
+ 'gcloud',
+ 'beta',
Review comment:
beta not necessary
##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -431,13 +448,17 @@ def _start_dataflow(
variables: Dict,
name: str,
command_prefix: List[str],
- label_formatter: Callable[[Dict], List[str]],
project_id: str,
multiple_jobs: bool = False,
on_new_job_id_callback: Optional[Callable[[str], None]] = None,
location: str = DEFAULT_DATAFLOW_LOCATION
) -> None:
- cmd = command_prefix + self._build_cmd(variables, label_formatter,
project_id)
+ cmd = command_prefix + [
+ "--runner=DataflowRunner",
Review comment:
This makes me think (larger scope than just SQL operator) of should we
have Beam Operators that support other runners?
For example some users it does not make sense Dataflow for smaller/shorter
batch jobs say (because you have the overhead of waiting for workers to come
up) For a job < 30 mins worker spin up time can be 10% performance hit. But
they may still want to use Apache Beam (on say spark runner) that submits to
non-ephemeral cluster (dataproc, EMR, spark on k8s, on prem infra, etc).
Would this be easy enough to achieve on Dataproc / EMR / Spark Operators ?
##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -496,17 +517,15 @@ def start_java_dataflow(
variables['jobName'] = name
variables['region'] = location
- def label_formatter(labels_dict):
- return ['--labels={}'.format(
- json.dumps(labels_dict).replace(' ', ''))]
+ if 'labels' in variables:
+ variables['labels'] = json.dumps(variables['labels']).replace(' ',
'')
Review comment:
might be better to check that labels adhere to these regex (in API
[docs](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job)):
```
Keys must conform to regexp: [\p{Ll}\p{Lo}][\p{Ll}\p{Lo}\p{N}_-]{0,62}
Values must conform to regexp: [\p{Ll}\p{Lo}\p{N}_-]{0,63}
```
and raise exception.
##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -694,29 +710,23 @@ def _build_dataflow_job_name(job_name: str,
append_job_name: bool = True) -> str
return safe_job_name
@staticmethod
- def _build_cmd(variables: Dict, label_formatter: Callable, project_id:
str) -> List[str]:
- command = [
- "--runner=DataflowRunner",
- "--project={}".format(project_id),
- ]
- if variables is None:
- return command
-
+ def _options_to_args(variables: Dict):
+ if not variables:
+ return []
# The logic of this method should be compatible with Apache Beam:
#
https://github.com/apache/beam/blob/b56740f0e8cd80c2873412847d0b336837429fb9/sdks/python/
# apache_beam/options/pipeline_options.py#L230-L251
+ args: List[str] = []
for attr, value in variables.items():
- if attr == 'labels':
- command += label_formatter(value)
- elif value is None:
- command.append(f"--{attr}")
+ if value is None:
Review comment:
nit: this if and the next elif take the same action and could be combined
```suggestion
if value is None or (isinstance(value, bool) and value):
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]