jgr-trackunit commented on issue #25640: URL: https://github.com/apache/airflow/issues/25640#issuecomment-1211875172
Hi! I've found an issue with `databricks` provider, at first glance it looks that it's related to: https://github.com/apache/airflow/pull/25115 @alexott I think you might be interesting for you. More info below: ``` [2022-08-11, 11:10:43 UTC] {{standard_task_runner.py:91}} ERROR - Failed to execute job 34817 for task xxxxx Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork args.func(args, dag=self.dag) File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper return f(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run _run_task_by_selected_method(args, dag, ti) File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method _run_raw_task(args, ti) File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task error_file=args.error_file, File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper return func(*args, session=session, **kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task self._execute_task_with_callbacks(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks result = self._execute_task(context, self.task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task result = execute_callable(context=context) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/databricks/operators/databricks.py", line 374, in execute self.run_id = self._hook.submit_run(self.json) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/databricks/hooks/databricks.py", line 152, in submit_run response = self._do_api_call(SUBMIT_RUN_ENDPOINT, json) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 493, in _do_api_call headers = {**self.user_agent_header, **aad_headers} File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 36, in __get__ value = obj.__dict__[self.func.__name__] = self.func(obj) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 136, in user_agent_header return {'user-agent': self.user_agent_value} File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 36, in __get__ value = obj.__dict__[self.func.__name__] = self.func(obj) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 144, in user_agent_value if provider.is_source: AttributeError: 'ProviderInfo' object has no attribute 'is_source' ``` I ran it on `MWAA == 2.2.2` with below configuration: ``` new_cluster = { "autoscale": {"min_workers": 1, "max_workers": 2}, "cluster_name": "", "spark_version": get_spark_version(), "spark_conf": Variable.get("SPARK_CONF", deserialize_json=True, default_var="{}"), "aws_attributes": { "first_on_demand": 1, "availability": "SPOT_WITH_FALLBACK", "zone_id": "auto", "instance_profile_arn": Variable.get("E2_INSTANCE_PROFILE_ARN", default_var=""), "spot_bid_price_percent": 100, }, "enable_elastic_disk": True, "node_type_id": "r5a.xlarge", "ssh_public_keys": [], "custom_tags": {"Application": "databricks", "env": env, "AnalyticsTask": "task name"}, "spark_env_vars": {}, "cluster_source": "JOB", "init_scripts": [], } with DAG( dag_id="dag id", description="desc", default_args=default_args, schedule_interval="0 2 * * *", # Every night at 02:00 catchup=False, max_active_runs=1, concurrency=1, is_paused_upon_creation=dag_is_paused_upon_creation, ) as dag: task = DatabricksSubmitRunOperator( task_id="task-name", databricks_conn_id="connection-name", new_cluster=new_cluster, notebook_task="notebook task", timeout_seconds=3600 * 4, # 4 hours polling_period_seconds=30, retries=1, ) ``` Tell me if you need more detail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
