MikhailKulikov-db commented on code in PR #46635:
URL: https://github.com/apache/airflow/pull/46635#discussion_r1950019285


##########
providers/databricks/src/airflow/providers/databricks/utils/databricks.py:
##########
@@ -64,3 +64,19 @@ def validate_trigger_event(event: dict):
         RunState.from_json(event["run_state"])
     except Exception:
         raise AirflowException(f'Run state returned by the Trigger is 
incorrect: {event["run_state"]}')
+
+
+def validate_serverless_job_settings(content) -> bool:
+    """
+    Validate correctness of the serverless task submitted.
+    If the task is either spark_python_task, python_wheel_task or dbt_task and 
job_cluster_key or
+    existing_cluster_id is not supplied and environments is supplied, checks 
if environment_key is
+    also populated else raises a Value exception
+    """
+    valid_tasks = {"spark_python_task", "python_wheel_task", "dbt_task"}
+    for task in content["tasks"]:
+        if valid_tasks.intersection(task):
+            if not {"job_cluster_key", 
"existing_cluster_id"}.intersection(task):
+                if "environments" in content and "environment_key" not in task:
+                    raise ValueError("environment_key not set for serverless 
task")

Review Comment:
   ```suggestion
                       raise ValueError("environment_key is required for 
serverless notebook task")
   ```



##########
providers/databricks/src/airflow/providers/databricks/utils/databricks.py:
##########
@@ -64,3 +64,19 @@ def validate_trigger_event(event: dict):
         RunState.from_json(event["run_state"])
     except Exception:
         raise AirflowException(f'Run state returned by the Trigger is 
incorrect: {event["run_state"]}')
+
+
+def validate_serverless_job_settings(content) -> bool:
+    """
+    Validate correctness of the serverless task submitted.
+    If the task is either spark_python_task, python_wheel_task or dbt_task and 
job_cluster_key or
+    existing_cluster_id is not supplied and environments is supplied, checks 
if environment_key is
+    also populated else raises a Value exception
+    """
+    valid_tasks = {"spark_python_task", "python_wheel_task", "dbt_task"}
+    for task in content["tasks"]:
+        if valid_tasks.intersection(task):
+            if not {"job_cluster_key", 
"existing_cluster_id"}.intersection(task):

Review Comment:
   I think we also need to check "new_cluster" here
   
   ```suggestion
               if not {"job_cluster_key", "existing_cluster_id", 
"new_cluster"}.intersection(task):
   ```



##########
providers/databricks/src/airflow/providers/databricks/utils/databricks.py:
##########
@@ -64,3 +64,19 @@ def validate_trigger_event(event: dict):
         RunState.from_json(event["run_state"])
     except Exception:
         raise AirflowException(f'Run state returned by the Trigger is 
incorrect: {event["run_state"]}')
+
+
+def validate_serverless_job_settings(content) -> bool:
+    """
+    Validate correctness of the serverless task submitted.
+    If the task is either spark_python_task, python_wheel_task or dbt_task and 
job_cluster_key or
+    existing_cluster_id is not supplied and environments is supplied, checks 
if environment_key is
+    also populated else raises a Value exception
+    """
+    valid_tasks = {"spark_python_task", "python_wheel_task", "dbt_task"}

Review Comment:
   Validation for "spark_python_task," "python_wheel_task," and "dbt_task" is 
already implemented in the Jobs API, so we don't need to add them separately in 
the Airflow provider.
   
   The only validation we want to introduce here is for the notebook task. 
Since adding this validation at the API level would be a breaking change, we 
are handling it in Airflow instead. Historically, users couldn't use job 
environments with notebook tasks. Now, the API will support it as an optional 
field, but in Airflow, we want to make it required.



##########
providers/databricks/src/airflow/providers/databricks/utils/databricks.py:
##########
@@ -64,3 +64,19 @@ def validate_trigger_event(event: dict):
         RunState.from_json(event["run_state"])
     except Exception:
         raise AirflowException(f'Run state returned by the Trigger is 
incorrect: {event["run_state"]}')
+
+
+def validate_serverless_job_settings(content) -> bool:
+    """
+    Validate correctness of the serverless task submitted.
+    If the task is either spark_python_task, python_wheel_task or dbt_task and 
job_cluster_key or
+    existing_cluster_id is not supplied and environments is supplied, checks 
if environment_key is
+    also populated else raises a Value exception
+    """
+    valid_tasks = {"spark_python_task", "python_wheel_task", "dbt_task"}

Review Comment:
   The second paragraph in the comment above could also serve as a comment for 
this function to clarify the reasoning behind this validation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to