This is an automated email from the ASF dual-hosted git repository. pankajkoti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 8b21b4e2126 Revert "Add support for serverless job in Databricks operators (#45188)" (#46724) 8b21b4e2126 is described below commit 8b21b4e2126a1a3a514407b29003b7349ab8cf22 Author: Mikhail Kulikov <mikhail.kuli...@databricks.com> AuthorDate: Thu Feb 13 16:26:25 2025 +0100 Revert "Add support for serverless job in Databricks operators (#45188)" (#46724) This reverts commit 0631ba76e0bdc7a52e873f3ec85787cbaf0e0dec. Related #46569, #45138 --- .../databricks/docs/operators/jobs_create.rst | 1 - providers/databricks/docs/operators/submit_run.rst | 1 - providers/databricks/docs/operators/task.rst | 7 -- .../providers/databricks/operators/databricks.py | 23 +---- .../databricks/operators/databricks_workflow.py | 10 -- .../databricks/operators/test_databricks.py | 102 +-------------------- .../operators/test_databricks_workflow.py | 12 --- .../tests/system/databricks/example_databricks.py | 23 ----- 8 files changed, 3 insertions(+), 176 deletions(-) diff --git a/providers/databricks/docs/operators/jobs_create.rst b/providers/databricks/docs/operators/jobs_create.rst index ad4525196ef..cd0961d305f 100644 --- a/providers/databricks/docs/operators/jobs_create.rst +++ b/providers/databricks/docs/operators/jobs_create.rst @@ -56,7 +56,6 @@ Currently the named parameters that ``DatabricksCreateJobsOperator`` supports ar - ``max_concurrent_runs`` - ``git_source`` - ``access_control_list`` - - ``environments`` Examples diff --git a/providers/databricks/docs/operators/submit_run.rst b/providers/databricks/docs/operators/submit_run.rst index 813af046127..373000c1855 100644 --- a/providers/databricks/docs/operators/submit_run.rst +++ b/providers/databricks/docs/operators/submit_run.rst @@ -80,7 +80,6 @@ Currently the named parameters that ``DatabricksSubmitRunOperator`` supports are - ``libraries`` - ``run_name`` - ``timeout_seconds`` - - ``environments`` .. code-block:: python diff --git a/providers/databricks/docs/operators/task.rst b/providers/databricks/docs/operators/task.rst index a1b83b3bb93..f2a675b676d 100644 --- a/providers/databricks/docs/operators/task.rst +++ b/providers/databricks/docs/operators/task.rst @@ -44,10 +44,3 @@ Running a SQL query in Databricks using DatabricksTaskOperator :language: python :start-after: [START howto_operator_databricks_task_sql] :end-before: [END howto_operator_databricks_task_sql] - -Running a python file in Databricks in using DatabricksTaskOperator -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. exampleinclude:: /../../providers/databricks/tests/system/databricks/example_databricks.py - :language: python - :start-after: [START howto_operator_databricks_task_python] - :end-before: [END howto_operator_databricks_task_python] diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 75035b13fc3..4303e32cb56 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -293,8 +293,6 @@ class DatabricksCreateJobsOperator(BaseOperator): :param databricks_retry_delay: Number of seconds to wait between retries (it might be a floating point number). :param databricks_retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class. - :param environments: An optional list of task execution environment specifications - that can be referenced by serverless tasks of this job. """ @@ -326,7 +324,6 @@ class DatabricksCreateJobsOperator(BaseOperator): databricks_retry_limit: int = 3, databricks_retry_delay: int = 1, databricks_retry_args: dict[Any, Any] | None = None, - environments: list[dict] | None = None, **kwargs, ) -> None: """Create a new ``DatabricksCreateJobsOperator``.""" @@ -363,8 +360,6 @@ class DatabricksCreateJobsOperator(BaseOperator): self.json["git_source"] = git_source if access_control_list is not None: self.json["access_control_list"] = access_control_list - if environments is not None: - self.json["environments"] = environments if self.json: self.json = normalise_json_content(self.json) @@ -508,8 +503,6 @@ class DatabricksSubmitRunOperator(BaseOperator): :param git_source: Optional specification of a remote git repository from which supported task types are retrieved. :param deferrable: Run operator in the deferrable mode. - :param environments: An optional list of task execution environment specifications - that can be referenced by serverless tasks of this job. .. seealso:: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit @@ -550,7 +543,6 @@ class DatabricksSubmitRunOperator(BaseOperator): wait_for_termination: bool = True, git_source: dict[str, str] | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), - environments: list[dict] | None = None, **kwargs, ) -> None: """Create a new ``DatabricksSubmitRunOperator``.""" @@ -595,8 +587,6 @@ class DatabricksSubmitRunOperator(BaseOperator): self.json["access_control_list"] = access_control_list if git_source is not None: self.json["git_source"] = git_source - if environments is not None: - self.json["environments"] = environments if "dbt_task" in self.json and "git_source" not in self.json: raise AirflowException("git_source is required for dbt_task") @@ -993,8 +983,6 @@ class DatabricksTaskBaseOperator(BaseOperator, ABC): :param wait_for_termination: if we should wait for termination of the job run. ``True`` by default. :param workflow_run_metadata: Metadata for the workflow run. This is used when the operator is used within a workflow. It is expected to be a dictionary containing the run_id and conn_id for the workflow. - :param environments: An optional list of task execution environment specifications - that can be referenced by serverless tasks of this job. """ def __init__( @@ -1012,7 +1000,6 @@ class DatabricksTaskBaseOperator(BaseOperator, ABC): polling_period_seconds: int = 5, wait_for_termination: bool = True, workflow_run_metadata: dict[str, Any] | None = None, - environments: list[dict] | None = None, **kwargs: Any, ): self.caller = caller @@ -1028,7 +1015,7 @@ class DatabricksTaskBaseOperator(BaseOperator, ABC): self.polling_period_seconds = polling_period_seconds self.wait_for_termination = wait_for_termination self.workflow_run_metadata = workflow_run_metadata - self.environments = environments + self.databricks_run_id: int | None = None super().__init__(**kwargs) @@ -1108,10 +1095,8 @@ class DatabricksTaskBaseOperator(BaseOperator, ABC): run_json["new_cluster"] = self.new_cluster elif self.existing_cluster_id: run_json["existing_cluster_id"] = self.existing_cluster_id - elif self.environments: - run_json["environments"] = self.environments else: - raise ValueError("Must specify either existing_cluster_id, new_cluster or environments.") + raise ValueError("Must specify either existing_cluster_id or new_cluster.") return run_json def _launch_job(self, context: Context | None = None) -> int | None: @@ -1415,8 +1400,6 @@ class DatabricksTaskOperator(DatabricksTaskBaseOperator): :param new_cluster: Specs for a new cluster on which this task will be run. :param polling_period_seconds: Controls the rate which we poll for the result of this notebook job run. :param wait_for_termination: if we should wait for termination of the job run. ``True`` by default. - :param environments: An optional list of task execution environment specifications - that can be referenced by serverless tasks of this job """ CALLER = "DatabricksTaskOperator" @@ -1436,7 +1419,6 @@ class DatabricksTaskOperator(DatabricksTaskBaseOperator): polling_period_seconds: int = 5, wait_for_termination: bool = True, workflow_run_metadata: dict | None = None, - environments: list[dict] | None = None, **kwargs, ): self.task_config = task_config @@ -1454,7 +1436,6 @@ class DatabricksTaskOperator(DatabricksTaskBaseOperator): polling_period_seconds=polling_period_seconds, wait_for_termination=wait_for_termination, workflow_run_metadata=workflow_run_metadata, - environments=environments, **kwargs, ) diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py index 12ea0792f00..f9f91adcb56 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py @@ -90,8 +90,6 @@ class _CreateDatabricksWorkflowOperator(BaseOperator): will be passed to all notebooks in the workflow. :param tasks_to_convert: A list of tasks to convert to a Databricks workflow. This list can also be populated after instantiation using the `add_task` method. - :param environments: An optional list of task execution environment specifications - that can be referenced by serverless tasks of this job. """ operator_extra_links = (WorkflowJobRunLink(), WorkflowJobRepairAllFailedLink()) @@ -108,7 +106,6 @@ class _CreateDatabricksWorkflowOperator(BaseOperator): max_concurrent_runs: int = 1, notebook_params: dict | None = None, tasks_to_convert: list[BaseOperator] | None = None, - environments: list[dict] | None = None, **kwargs, ): self.databricks_conn_id = databricks_conn_id @@ -120,7 +117,6 @@ class _CreateDatabricksWorkflowOperator(BaseOperator): self.tasks_to_convert = tasks_to_convert or [] self.relevant_upstreams = [task_id] self.workflow_run_metadata: WorkflowRunMetadata | None = None - self.environments = environments super().__init__(task_id=task_id, **kwargs) def _get_hook(self, caller: str) -> DatabricksHook: @@ -160,7 +156,6 @@ class _CreateDatabricksWorkflowOperator(BaseOperator): "format": "MULTI_TASK", "job_clusters": self.job_clusters, "max_concurrent_runs": self.max_concurrent_runs, - "environments": self.environments, } return merge(default_json, self.extra_job_params) @@ -279,8 +274,6 @@ class DatabricksWorkflowTaskGroup(TaskGroup): all python tasks in the workflow. :param spark_submit_params: A list of spark submit parameters to pass to the workflow. These parameters will be passed to all spark submit tasks. - :param environments: An optional list of task execution environment specifications - that can be referenced by serverless tasks of this job. """ is_databricks = True @@ -297,7 +290,6 @@ class DatabricksWorkflowTaskGroup(TaskGroup): notebook_params: dict | None = None, python_params: list | None = None, spark_submit_params: list | None = None, - environments: list[dict] | None = None, **kwargs, ): self.databricks_conn_id = databricks_conn_id @@ -310,7 +302,6 @@ class DatabricksWorkflowTaskGroup(TaskGroup): self.notebook_params = notebook_params or {} self.python_params = python_params or [] self.spark_submit_params = spark_submit_params or [] - self.environments = environments or [] super().__init__(**kwargs) def __exit__( @@ -330,7 +321,6 @@ class DatabricksWorkflowTaskGroup(TaskGroup): job_clusters=self.job_clusters, max_concurrent_runs=self.max_concurrent_runs, notebook_params=self.notebook_params, - environments=self.environments, ) for task in tasks: diff --git a/providers/databricks/tests/provider_tests/databricks/operators/test_databricks.py b/providers/databricks/tests/provider_tests/databricks/operators/test_databricks.py index 5ff686ea0aa..51e7a765998 100644 --- a/providers/databricks/tests/provider_tests/databricks/operators/test_databricks.py +++ b/providers/databricks/tests/provider_tests/databricks/operators/test_databricks.py @@ -265,15 +265,6 @@ ACCESS_CONTROL_LIST = [ "permission_level": "CAN_MANAGE", } ] -ENVIRONMENTS = [ - { - "environment_key": "default_environment", - "spec": { - "client": "1", - "dependencies": ["library1"], - }, - } -] def mock_dict(d: dict): @@ -316,7 +307,6 @@ class TestDatabricksCreateJobsOperator: max_concurrent_runs=MAX_CONCURRENT_RUNS, git_source=GIT_SOURCE, access_control_list=ACCESS_CONTROL_LIST, - environments=ENVIRONMENTS, ) expected = utils.normalise_json_content( { @@ -331,7 +321,6 @@ class TestDatabricksCreateJobsOperator: "max_concurrent_runs": MAX_CONCURRENT_RUNS, "git_source": GIT_SOURCE, "access_control_list": ACCESS_CONTROL_LIST, - "environments": ENVIRONMENTS, } ) @@ -353,7 +342,6 @@ class TestDatabricksCreateJobsOperator: "max_concurrent_runs": MAX_CONCURRENT_RUNS, "git_source": GIT_SOURCE, "access_control_list": ACCESS_CONTROL_LIST, - "environments": ENVIRONMENTS, } op = DatabricksCreateJobsOperator(task_id=TASK_ID, json=json) @@ -370,7 +358,6 @@ class TestDatabricksCreateJobsOperator: "max_concurrent_runs": MAX_CONCURRENT_RUNS, "git_source": GIT_SOURCE, "access_control_list": ACCESS_CONTROL_LIST, - "environments": ENVIRONMENTS, } ) @@ -393,7 +380,6 @@ class TestDatabricksCreateJobsOperator: override_max_concurrent_runs = 0 override_git_source = {} override_access_control_list = [] - override_environments = [] json = { "name": JOB_NAME, "tags": TAGS, @@ -406,7 +392,6 @@ class TestDatabricksCreateJobsOperator: "max_concurrent_runs": MAX_CONCURRENT_RUNS, "git_source": GIT_SOURCE, "access_control_list": ACCESS_CONTROL_LIST, - "environments": ENVIRONMENTS, } op = DatabricksCreateJobsOperator( @@ -423,7 +408,6 @@ class TestDatabricksCreateJobsOperator: max_concurrent_runs=override_max_concurrent_runs, git_source=override_git_source, access_control_list=override_access_control_list, - environments=override_environments, ) expected = utils.normalise_json_content( @@ -439,7 +423,6 @@ class TestDatabricksCreateJobsOperator: "max_concurrent_runs": override_max_concurrent_runs, "git_source": override_git_source, "access_control_list": override_access_control_list, - "environments": override_environments, } ) @@ -483,7 +466,6 @@ class TestDatabricksCreateJobsOperator: "max_concurrent_runs": MAX_CONCURRENT_RUNS, "git_source": GIT_SOURCE, "access_control_list": ACCESS_CONTROL_LIST, - "environments": ENVIRONMENTS, } op = DatabricksCreateJobsOperator(task_id=TASK_ID, json=json) db_mock = db_mock_class.return_value @@ -508,7 +490,6 @@ class TestDatabricksCreateJobsOperator: "max_concurrent_runs": MAX_CONCURRENT_RUNS, "git_source": GIT_SOURCE, "access_control_list": ACCESS_CONTROL_LIST, - "environments": ENVIRONMENTS, } ) db_mock_class.assert_called_once_with( @@ -541,7 +522,6 @@ class TestDatabricksCreateJobsOperator: "max_concurrent_runs": MAX_CONCURRENT_RUNS, "git_source": GIT_SOURCE, "access_control_list": ACCESS_CONTROL_LIST, - "environments": ENVIRONMENTS, } op = DatabricksCreateJobsOperator(task_id=TASK_ID, json=json) db_mock = db_mock_class.return_value @@ -564,7 +544,6 @@ class TestDatabricksCreateJobsOperator: "max_concurrent_runs": MAX_CONCURRENT_RUNS, "git_source": GIT_SOURCE, "access_control_list": ACCESS_CONTROL_LIST, - "environments": ENVIRONMENTS, } ) db_mock_class.assert_called_once_with( @@ -675,72 +654,6 @@ class TestDatabricksSubmitRunOperator: assert expected == utils.normalise_json_content(op.json) - def test_init_with_serverless_spark_python_task_named_parameters(self): - """ - Test the initializer with the named parameters. - """ - python_tasks = [ - { - "task_key": "pythong_task_1", - "new_cluster": { - "spark_version": "7.3.x-scala2.12", - "node_type_id": "i3.xlarge", - "spark_conf": { - "spark.speculation": True, - }, - "aws_attributes": { - "availability": "SPOT", - "zone_id": "us-west-2a", - }, - "autoscale": { - "min_workers": 2, - "max_workers": 16, - }, - }, - "spark_python_task": {"python_file": "/Users/jsm...@example.com/example_file.py"}, - "timeout_seconds": 86400, - "max_retries": 3, - "min_retry_interval_millis": 2000, - "retry_on_timeout": False, - "environment_key": "default_environment", - }, - ] - json = { - "name": JOB_NAME, - "tags": TAGS, - "tasks": python_tasks, - "job_clusters": JOB_CLUSTERS, - "email_notifications": EMAIL_NOTIFICATIONS, - "webhook_notifications": WEBHOOK_NOTIFICATIONS, - "timeout_seconds": TIMEOUT_SECONDS, - "schedule": SCHEDULE, - "max_concurrent_runs": MAX_CONCURRENT_RUNS, - "git_source": GIT_SOURCE, - } - op = DatabricksSubmitRunOperator( - task_id=TASK_ID, - json=json, - environments=ENVIRONMENTS, - ) - expected = utils.normalise_json_content( - { - "name": JOB_NAME, - "tags": TAGS, - "tasks": python_tasks, - "job_clusters": JOB_CLUSTERS, - "email_notifications": EMAIL_NOTIFICATIONS, - "webhook_notifications": WEBHOOK_NOTIFICATIONS, - "timeout_seconds": TIMEOUT_SECONDS, - "schedule": SCHEDULE, - "max_concurrent_runs": MAX_CONCURRENT_RUNS, - "git_source": GIT_SOURCE, - "environments": ENVIRONMENTS, - "run_name": TASK_ID, - } - ) - - assert expected == utils.normalise_json_content(op.json) - def test_init_with_pipeline_name_task_named_parameters(self): """ Test the initializer with the named parameters. @@ -2217,7 +2130,7 @@ class TestDatabricksNotebookOperator: ) with pytest.raises(ValueError) as exc_info: operator._get_run_json() - exception_message = "Must specify either existing_cluster_id, new_cluster or environments." + exception_message = "Must specify either existing_cluster_id or new_cluster." assert str(exc_info.value) == exception_message def test_job_runs_forever_by_default(self): @@ -2430,16 +2343,3 @@ class TestDatabricksTaskOperator: expected_task_key = "test_task_key" assert expected_task_key == operator.databricks_task_key - - def test_get_task_base_json_serverless(self): - task_config = SPARK_PYTHON_TASK - operator = DatabricksTaskOperator( - task_id="test_task", - databricks_conn_id="test_conn_id", - task_config=task_config, - environments=ENVIRONMENTS, - ) - task_base_json = operator._get_task_base_json() - - assert operator.task_config == task_config - assert task_base_json == task_config diff --git a/providers/databricks/tests/provider_tests/databricks/operators/test_databricks_workflow.py b/providers/databricks/tests/provider_tests/databricks/operators/test_databricks_workflow.py index 463b90cc055..e7b940549cf 100644 --- a/providers/databricks/tests/provider_tests/databricks/operators/test_databricks_workflow.py +++ b/providers/databricks/tests/provider_tests/databricks/operators/test_databricks_workflow.py @@ -77,19 +77,9 @@ def test_flatten_node(): def test_create_workflow_json(mock_databricks_hook, context, mock_task_group): """Test that _CreateDatabricksWorkflowOperator.create_workflow_json returns the expected JSON.""" - environments = [ - { - "environment_key": "default_environment", - "spec": { - "client": "1", - "dependencies": ["library1"], - }, - } - ] operator = _CreateDatabricksWorkflowOperator( task_id="test_task", databricks_conn_id="databricks_default", - environments=environments, ) operator.task_group = mock_task_group @@ -106,7 +96,6 @@ def test_create_workflow_json(mock_databricks_hook, context, mock_task_group): assert workflow_json["job_clusters"] == [] assert workflow_json["max_concurrent_runs"] == 1 assert workflow_json["timeout_seconds"] == 0 - assert workflow_json["environments"] == environments def test_create_or_reset_job_existing(mock_databricks_hook, context, mock_task_group): @@ -227,7 +216,6 @@ def test_task_group_exit_creates_operator(mock_databricks_workflow_operator): task_group=task_group, task_id="launch", databricks_conn_id="databricks_conn", - environments=[], existing_clusters=[], extra_job_params={}, job_clusters=[], diff --git a/providers/databricks/tests/system/databricks/example_databricks.py b/providers/databricks/tests/system/databricks/example_databricks.py index 360645cc30d..999cebb6742 100644 --- a/providers/databricks/tests/system/databricks/example_databricks.py +++ b/providers/databricks/tests/system/databricks/example_databricks.py @@ -238,29 +238,6 @@ with DAG( ) # [END howto_operator_databricks_task_sql] - # [START howto_operator_databricks_task_python] - environments = [ - { - "environment_key": "default_environment", - "spec": { - "client": "1", - "dependencies": ["library1"], - }, - } - ] - task_operator_python_query = DatabricksTaskOperator( - task_id="python_task", - databricks_conn_id="databricks_conn", - task_config={ - "spark_python_task": { - "python_file": "/Users/jsm...@example.com/example_file.py", - }, - "environment_key": "default_environment", - }, - environments=environments, - ) - # [END howto_operator_databricks_task_python] - from tests_common.test_utils.watcher import watcher # This test needs watcher in order to properly mark success/failure