MikhailKulikov-db commented on code in PR #46635:
URL: https://github.com/apache/airflow/pull/46635#discussion_r1953549978
##########
providers/databricks/src/airflow/providers/databricks/utils/databricks.py:
##########
@@ -64,3 +64,22 @@ def validate_trigger_event(event: dict):
RunState.from_json(event["run_state"])
except Exception:
raise AirflowException(f'Run state returned by the Trigger is
incorrect: {event["run_state"]}')
+
+
+def validate_serverless_notebook_settings(content) -> bool:
+ """
+ Validate correctness of the serverless task submitted.
+ If the task is notebook_task and job_cluster_key or existing_cluster_id or
new_cluster
+ is not supplied and environments is supplied, checks if environment_key is
also populated else raises
Review Comment:
```suggestion
is not supplied, checks if environment_key is also populated else raises
```
##########
providers/databricks/src/airflow/providers/databricks/utils/databricks.py:
##########
@@ -64,3 +64,22 @@ def validate_trigger_event(event: dict):
RunState.from_json(event["run_state"])
except Exception:
raise AirflowException(f'Run state returned by the Trigger is
incorrect: {event["run_state"]}')
+
+
+def validate_serverless_notebook_settings(content) -> bool:
+ """
+ Validate correctness of the serverless task submitted.
+ If the task is notebook_task and job_cluster_key or existing_cluster_id or
new_cluster
+ is not supplied and environments is supplied, checks if environment_key is
also populated else raises
+ a Value exception
+ """
+ if "tasks" in content:
+ for task in content["tasks"]:
+ if (
+ "notebook_task" in task
+ and not {"job_cluster_key", "existing_cluster_id",
"new_cluster"}.intersection(task)
+ and "environments" in content
+ and "environment_key" not in task
+ ):
+ raise ValueError("environment_key is required for serverless
notebook task")
+ return True
Review Comment:
```suggestion
```
##########
providers/databricks/src/airflow/providers/databricks/utils/databricks.py:
##########
@@ -64,3 +64,22 @@ def validate_trigger_event(event: dict):
RunState.from_json(event["run_state"])
except Exception:
raise AirflowException(f'Run state returned by the Trigger is
incorrect: {event["run_state"]}')
+
+
+def validate_serverless_notebook_settings(content) -> bool:
+ """
+ Validate correctness of the serverless task submitted.
+ If the task is notebook_task and job_cluster_key or existing_cluster_id or
new_cluster
+ is not supplied and environments is supplied, checks if environment_key is
also populated else raises
+ a Value exception
+ """
+ if "tasks" in content:
+ for task in content["tasks"]:
+ if (
+ "notebook_task" in task
+ and not {"job_cluster_key", "existing_cluster_id",
"new_cluster"}.intersection(task)
+ and "environments" in content
+ and "environment_key" not in task
+ ):
+ raise ValueError("environment_key is required for serverless
notebook task")
+ return True
Review Comment:
nit: we do not need to return True
##########
providers/databricks/tests/provider_tests/databricks/operators/test_databricks.py:
##########
@@ -741,6 +741,41 @@ def
test_init_with_serverless_spark_python_task_named_parameters(self):
assert expected == utils.normalise_json_content(op.json)
+ def
test_notebook_task_named_parameters_serverless_no_environment_key_raises_error(self):
+ """
+ Test the initializer with the named parameters.
+ """
+ notebook_tasks = [
+ {
+ "task_key": "pythong_task_1",
+ "notebook_task": NOTEBOOK_TASK,
+ "timeout_seconds": 86400,
+ "max_retries": 3,
+ "min_retry_interval_millis": 2000,
+ "retry_on_timeout": False,
+ },
+ ]
+ json = {
+ "name": JOB_NAME,
+ "tags": TAGS,
+ "tasks": notebook_tasks,
+ "job_clusters": JOB_CLUSTERS,
+ "email_notifications": EMAIL_NOTIFICATIONS,
+ "webhook_notifications": WEBHOOK_NOTIFICATIONS,
+ "timeout_seconds": TIMEOUT_SECONDS,
+ "schedule": SCHEDULE,
+ "max_concurrent_runs": MAX_CONCURRENT_RUNS,
+ "git_source": GIT_SOURCE,
+ }
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID,
+ json=json,
+ environments=ENVIRONMENTS,
+ )
+
+ with pytest.raises(ValueError):
Review Comment:
nit: it would be good to check that exception message contains the
`environment_key is required for serverless notebook task`
##########
providers/databricks/src/airflow/providers/databricks/operators/databricks.py:
##########
@@ -1117,6 +1122,7 @@ def _get_run_json(self) -> dict[str, Any]:
def _launch_job(self, context: Context | None = None) -> int | None:
Review Comment:
Let's just remove the environments field from DatabricksTaskBaseOperator and
DatabricksTaskOperator for now.
We will add `environment_key` in the future PRs.
##########
providers/databricks/src/airflow/providers/databricks/operators/databricks.py:
##########
@@ -630,6 +634,7 @@ def execute(self, context: Context):
pipeline_name = self.json["pipeline_task"]["pipeline_name"]
self.json["pipeline_task"]["pipeline_id"] =
self._hook.find_pipeline_id_by_name(pipeline_name)
del self.json["pipeline_task"]["pipeline_name"]
+ validate_serverless_notebook_settings(self.json)
Review Comment:
We also need this validation for `DatabricksCreateJobsOperator`
##########
providers/databricks/src/airflow/providers/databricks/utils/databricks.py:
##########
@@ -64,3 +64,22 @@ def validate_trigger_event(event: dict):
RunState.from_json(event["run_state"])
except Exception:
raise AirflowException(f'Run state returned by the Trigger is
incorrect: {event["run_state"]}')
+
+
+def validate_serverless_notebook_settings(content) -> bool:
+ """
+ Validate correctness of the serverless task submitted.
+ If the task is notebook_task and job_cluster_key or existing_cluster_id or
new_cluster
+ is not supplied and environments is supplied, checks if environment_key is
also populated else raises
+ a Value exception
+ """
+ if "tasks" in content:
+ for task in content["tasks"]:
+ if (
+ "notebook_task" in task
+ and not {"job_cluster_key", "existing_cluster_id",
"new_cluster"}.intersection(task)
+ and "environments" in content
Review Comment:
```suggestion
```
Let's not check "environments" here. It might be empty for job with
serverless notebook task.
Checking `not {"job_cluster_key", "existing_cluster_id",
"new_cluster"}.intersection(task)` should be enough.
##########
providers/databricks/tests/provider_tests/databricks/operators/test_databricks.py:
##########
@@ -741,6 +741,41 @@ def
test_init_with_serverless_spark_python_task_named_parameters(self):
assert expected == utils.normalise_json_content(op.json)
+ def
test_notebook_task_named_parameters_serverless_no_environment_key_raises_error(self):
+ """
+ Test the initializer with the named parameters.
+ """
+ notebook_tasks = [
+ {
+ "task_key": "pythong_task_1",
+ "notebook_task": NOTEBOOK_TASK,
+ "timeout_seconds": 86400,
+ "max_retries": 3,
+ "min_retry_interval_millis": 2000,
+ "retry_on_timeout": False,
+ },
+ ]
+ json = {
+ "name": JOB_NAME,
+ "tags": TAGS,
+ "tasks": notebook_tasks,
+ "job_clusters": JOB_CLUSTERS,
+ "email_notifications": EMAIL_NOTIFICATIONS,
+ "webhook_notifications": WEBHOOK_NOTIFICATIONS,
+ "timeout_seconds": TIMEOUT_SECONDS,
+ "schedule": SCHEDULE,
+ "max_concurrent_runs": MAX_CONCURRENT_RUNS,
+ "git_source": GIT_SOURCE,
+ }
+ op = DatabricksSubmitRunOperator(
Review Comment:
Let's add test for DatabricksCreateJobsOperator as well
##########
providers/databricks/src/airflow/providers/databricks/utils/databricks.py:
##########
@@ -64,3 +64,22 @@ def validate_trigger_event(event: dict):
RunState.from_json(event["run_state"])
except Exception:
raise AirflowException(f'Run state returned by the Trigger is
incorrect: {event["run_state"]}')
+
+
+def validate_serverless_notebook_settings(content) -> bool:
Review Comment:
nit: it is better to not return anything since we failing validation by
throwing exception
```suggestion
def validate_serverless_notebook_settings(content):
```
##########
providers/databricks/src/airflow/providers/databricks/operators/databricks.py:
##########
@@ -1117,6 +1122,7 @@ def _get_run_json(self) -> dict[str, Any]:
def _launch_job(self, context: Context | None = None) -> int | None:
Review Comment:
The issue regarding the implementation in the previous PR:
We added the `environments` parameter to DatabricksTaskBaseOperator and
DatabricksTaskOperator, but it should not be there since `environments` is a
job-level field. We should add `environment_key` instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]