phi-friday opened a new pull request, #35293: URL: https://github.com/apache/airflow/pull/35293
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- Thank you for contributing! Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping committers for the review! In case of an existing issue, reference it using one of the following: closes: #ISSUE related: #ISSUE How to write a good git commit message: http://chris.beams.io/posts/git-commit/ --> <!-- Please keep an empty line above the dashes. --> https://github.com/apache/airflow/discussions/35285 Changed the existing `pickling_library` attribute to property and added `use_dill` attribute instead, as the property that `_DockerDecoratedOperator` has is not serializable and causes an error in the scheduler. ### origin ```log ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ [2023-10-31T10:21:35.996+0000] {executor_loader.py:117} INFO - Loaded executor: LocalExecutor [2023-10-31 10:21:36 +0000] [140] [INFO] Starting gunicorn 21.2.0 [2023-10-31 10:21:36 +0000] [140] [INFO] Listening at: http://[::]:8793 (140) [2023-10-31 10:21:36 +0000] [140] [INFO] Using worker: sync [2023-10-31T10:21:36.038+0000] {scheduler_job_runner.py:803} INFO - Starting the scheduler [2023-10-31T10:21:36.039+0000] {scheduler_job_runner.py:810} INFO - Processing each file at most -1 times [2023-10-31 10:21:36 +0000] [141] [INFO] Booting worker with pid: 141 [2023-10-31 10:21:36 +0000] [148] [INFO] Booting worker with pid: 148 [2023-10-31T10:21:36.112+0000] {manager.py:169} INFO - Launched DagFileProcessorManager with pid: 193 [2023-10-31T10:21:36.124+0000] {scheduler_job_runner.py:1611} INFO - Adopting or resetting orphaned tasks for active dag runs [2023-10-31T10:21:36.203+0000] {settings.py:61} INFO - Configured default timezone Timezone('UTC') [2023-10-31T10:21:51.192+0000] {scheduler_job_runner.py:419} INFO - 1 tasks up for execution: <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:21:51.102283+00:00 [scheduled]> [2023-10-31T10:21:51.192+0000] {scheduler_job_runner.py:482} INFO - DAG test_docker_task_error has 0/16 running and queued tasks [2023-10-31T10:21:51.192+0000] {scheduler_job_runner.py:598} INFO - Setting the following tasks to queued state: <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:21:51.102283+00:00 [scheduled]> [2023-10-31T10:21:51.193+0000] {taskinstance.py:2177} WARNING - cannot record scheduled_duration for task no_error because previous state change time has not been saved [2023-10-31T10:21:51.194+0000] {scheduler_job_runner.py:641} INFO - Sending TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1) to executor with priority 2 and queue default [2023-10-31T10:21:51.194+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'no_error', 'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py'] [2023-10-31T10:21:51.195+0000] {local_executor.py:89} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 'no_error', 'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py'] [2023-10-31T10:21:51.229+0000] {dagbag.py:538} INFO - Filling up the DagBag from /files/dags/test_dag.py Changing /root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:21:51.102283+00:00/task_id=no_error permission to 509 [2023-10-31T10:21:51.466+0000] {task_command.py:421} INFO - Running <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:21:51.102283+00:00 [queued]> on host ef12b15f26fd [2023-10-31T10:21:51.694+0000] {local_executor.py:138} ERROR - Failed to execute task cannot pickle 'module' object. Traceback (most recent call last): File "/opt/airflow/airflow/executors/local_executor.py", line 134, in _execute_work_in_fork args.func(args) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) File "/opt/airflow/airflow/utils/cli.py", line 114, in wrapper return f(*args, **kwargs) File "/opt/airflow/airflow/cli/commands/task_command.py", line 436, in task_run task_return_code = _run_task_by_selected_method(args, _dag, ti) File "/opt/airflow/airflow/cli/commands/task_command.py", line 214, in _run_task_by_selected_method return _run_task_by_local_task_job(args, ti) File "/opt/airflow/airflow/cli/commands/task_command.py", line 276, in _run_task_by_local_task_job ret = run_job(job=job_runner.job, execute_callable=job_runner._execute) File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper return func(*args, session=session, **kwargs) File "/opt/airflow/airflow/jobs/job.py", line 393, in run_job return execute_job(job, execute_callable=execute_callable) File "/opt/airflow/airflow/jobs/job.py", line 422, in execute_job ret = execute_callable() File "/opt/airflow/airflow/jobs/local_task_job_runner.py", line 197, in _execute self.handle_task_exit(return_code) File "/opt/airflow/airflow/jobs/local_task_job_runner.py", line 237, in handle_task_exit self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query) File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper return func(*args, session=session, **kwargs) File "/opt/airflow/airflow/models/taskinstance.py", line 3162, in schedule_downstream_tasks partial_dag = task.dag.partial_subset( File "/opt/airflow/airflow/models/dag.py", line 2476, in partial_subset dag.task_dict = { File "/opt/airflow/airflow/models/dag.py", line 2477, in <dictcomp> t.task_id: _deepcopy_task(t) File "/opt/airflow/airflow/models/dag.py", line 2474, in _deepcopy_task return copy.deepcopy(t, memo) File "/usr/local/lib/python3.8/copy.py", line 153, in deepcopy y = copier(memo) File "/opt/airflow/airflow/models/baseoperator.py", line 1215, in __deepcopy__ setattr(result, k, copy.deepcopy(v, memo)) File "/usr/local/lib/python3.8/copy.py", line 161, in deepcopy rv = reductor(4) TypeError: cannot pickle 'module' object [2023-10-31T10:21:52.290+0000] {scheduler_job_runner.py:419} INFO - 1 tasks up for execution: <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:21:51.102283+00:00 [scheduled]> [2023-10-31T10:21:52.290+0000] {scheduler_job_runner.py:482} INFO - DAG test_docker_task_error has 0/16 running and queued tasks [2023-10-31T10:21:52.290+0000] {scheduler_job_runner.py:598} INFO - Setting the following tasks to queued state: <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:21:51.102283+00:00 [scheduled]> [2023-10-31T10:21:52.292+0000] {taskinstance.py:2177} WARNING - cannot record scheduled_duration for task pickle_error because previous state change time has not been saved [2023-10-31T10:21:52.292+0000] {scheduler_job_runner.py:641} INFO - Sending TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default [2023-10-31T10:21:52.293+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py'] [2023-10-31T10:21:52.295+0000] {local_executor.py:89} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py'] [2023-10-31T10:21:52.298+0000] {scheduler_job_runner.py:691} INFO - Received executor event with state failed for task instance TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1) [2023-10-31T10:21:52.305+0000] {scheduler_job_runner.py:728} INFO - TaskInstance Finished: dag_id=test_docker_task_error, task_id=no_error, run_id=manual__2023-10-31T10:21:51.102283+00:00, map_index=-1, run_start_date=2023-10-31 10:21:51.573970+00:00, run_end_date=2023-10-31 10:21:51.655716+00:00, run_duration=0.081746, state=success, executor_state=failed, try_number=1, max_tries=0, job_id=27, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2023-10-31 10:21:51.193015+00:00, queued_by_job_id=26, pid=300 [2023-10-31T10:21:52.330+0000] {dagbag.py:538} INFO - Filling up the DagBag from /files/dags/test_dag.py Changing /root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:21:51.102283+00:00/task_id=pickle_error permission to 509 [2023-10-31T10:21:52.452+0000] {task_command.py:421} INFO - Running <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:21:51.102283+00:00 [queued]> on host ef12b15f26fd [2023-10-31T10:21:53.357+0000] {dagrun.py:729} INFO - Marking run <DagRun test_docker_task_error @ 2023-10-31 10:21:51.102283+00:00: manual__2023-10-31T10:21:51.102283+00:00, state:running, queued_at: 2023-10-31 10:21:51.110909+00:00. externally triggered: True> successful [2023-10-31T10:21:53.357+0000] {dagrun.py:780} INFO - DagRun Finished: dag_id=test_docker_task_error, execution_date=2023-10-31 10:21:51.102283+00:00, run_id=manual__2023-10-31T10:21:51.102283+00:00, run_start_date=2023-10-31 10:21:51.176241+00:00, run_end_date=2023-10-31 10:21:53.357481+00:00, run_duration=2.18124, state=success, external_trigger=True, run_type=manual, data_interval_start=2023-10-31 10:21:51.102283+00:00, data_interval_end=2023-10-31 10:21:51.102283+00:00, dag_hash=ecb0232f1acdec9dce7b278087864bff [2023-10-31T10:21:53.362+0000] {scheduler_job_runner.py:691} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1) [2023-10-31T10:21:53.364+0000] {scheduler_job_runner.py:728} INFO - TaskInstance Finished: dag_id=test_docker_task_error, task_id=pickle_error, run_id=manual__2023-10-31T10:21:51.102283+00:00, map_index=-1, run_start_date=2023-10-31 10:21:52.502395+00:00, run_end_date=2023-10-31 10:21:53.228269+00:00, run_duration=0.725874, state=success, executor_state=success, try_number=1, max_tries=0, job_id=28, pool=default_pool, queue=default, priority_weight=1, operator=_DockerDecoratedOperator, queued_dttm=2023-10-31 10:21:52.291449+00:00, queued_by_job_id=26, pid=302 ``` ### new ```log [2023-10-31T10:17:12.335+0000] {executor_loader.py:117} INFO - Loaded executor: LocalExecutor [2023-10-31 10:17:12 +0000] [369] [INFO] Starting gunicorn 21.2.0 [2023-10-31 10:17:12 +0000] [369] [INFO] Listening at: http://[::]:8793 (369) [2023-10-31 10:17:12 +0000] [369] [INFO] Using worker: sync [2023-10-31 10:17:12 +0000] [370] [INFO] Booting worker with pid: 370 [2023-10-31T10:17:12.363+0000] {scheduler_job_runner.py:803} INFO - Starting the scheduler [2023-10-31T10:17:12.363+0000] {scheduler_job_runner.py:810} INFO - Processing each file at most -1 times [2023-10-31T10:17:12.411+0000] {manager.py:169} INFO - Launched DagFileProcessorManager with pid: 419 [2023-10-31T10:17:12.412+0000] {scheduler_job_runner.py:1611} INFO - Adopting or resetting orphaned tasks for active dag runs [2023-10-31 10:17:12 +0000] [481] [INFO] Booting worker with pid: 481 [2023-10-31T10:17:12.466+0000] {settings.py:61} INFO - Configured default timezone Timezone('UTC') [2023-10-31T10:17:19.805+0000] {scheduler_job_runner.py:419} INFO - 1 tasks up for execution: <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:17:19.415567+00:00 [scheduled]> [2023-10-31T10:17:19.805+0000] {scheduler_job_runner.py:482} INFO - DAG test_docker_task_error has 0/16 running and queued tasks [2023-10-31T10:17:19.805+0000] {scheduler_job_runner.py:598} INFO - Setting the following tasks to queued state: <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:17:19.415567+00:00 [scheduled]> [2023-10-31T10:17:19.806+0000] {taskinstance.py:2177} WARNING - cannot record scheduled_duration for task no_error because previous state change time has not been saved [2023-10-31T10:17:19.806+0000] {scheduler_job_runner.py:641} INFO - Sending TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1) to executor with priority 2 and queue default [2023-10-31T10:17:19.806+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'no_error', 'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py'] [2023-10-31T10:17:19.807+0000] {local_executor.py:89} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 'no_error', 'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py'] [2023-10-31T10:17:19.833+0000] {dagbag.py:538} INFO - Filling up the DagBag from /files/dags/test_dag.py Changing /root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:17:19.415567+00:00/task_id=no_error permission to 509 [2023-10-31T10:17:20.003+0000] {task_command.py:421} INFO - Running <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:17:19.415567+00:00 [queued]> on host c482888bf857 [2023-10-31T10:17:20.888+0000] {scheduler_job_runner.py:419} INFO - 1 tasks up for execution: <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:17:19.415567+00:00 [scheduled]> [2023-10-31T10:17:20.889+0000] {scheduler_job_runner.py:482} INFO - DAG test_docker_task_error has 0/16 running and queued tasks [2023-10-31T10:17:20.889+0000] {scheduler_job_runner.py:598} INFO - Setting the following tasks to queued state: <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:17:19.415567+00:00 [scheduled]> [2023-10-31T10:17:20.890+0000] {taskinstance.py:2177} WARNING - cannot record scheduled_duration for task pickle_error because previous state change time has not been saved [2023-10-31T10:17:20.890+0000] {scheduler_job_runner.py:641} INFO - Sending TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default [2023-10-31T10:17:20.891+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py'] [2023-10-31T10:17:20.892+0000] {local_executor.py:89} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py'] [2023-10-31T10:17:20.895+0000] {scheduler_job_runner.py:691} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1) [2023-10-31T10:17:20.900+0000] {scheduler_job_runner.py:728} INFO - TaskInstance Finished: dag_id=test_docker_task_error, task_id=no_error, run_id=manual__2023-10-31T10:17:19.415567+00:00, map_index=-1, run_start_date=2023-10-31 10:17:20.059029+00:00, run_end_date=2023-10-31 10:17:20.140250+00:00, run_duration=0.081221, state=success, executor_state=success, try_number=1, max_tries=0, job_id=22, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2023-10-31 10:17:19.805657+00:00, queued_by_job_id=21, pid=519 [2023-10-31T10:17:20.924+0000] {dagbag.py:538} INFO - Filling up the DagBag from /files/dags/test_dag.py Changing /root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:17:19.415567+00:00/task_id=pickle_error permission to 509 [2023-10-31T10:17:21.046+0000] {task_command.py:421} INFO - Running <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:17:19.415567+00:00 [queued]> on host c482888bf857 [2023-10-31T10:17:21.949+0000] {dagrun.py:729} INFO - Marking run <DagRun test_docker_task_error @ 2023-10-31 10:17:19.415567+00:00: manual__2023-10-31T10:17:19.415567+00:00, state:running, queued_at: 2023-10-31 10:17:19.429351+00:00. externally triggered: True> successful [2023-10-31T10:17:21.950+0000] {dagrun.py:780} INFO - DagRun Finished: dag_id=test_docker_task_error, execution_date=2023-10-31 10:17:19.415567+00:00, run_id=manual__2023-10-31T10:17:19.415567+00:00, run_start_date=2023-10-31 10:17:19.789176+00:00, run_end_date=2023-10-31 10:17:21.950050+00:00, run_duration=2.160874, state=success, external_trigger=True, run_type=manual, data_interval_start=2023-10-31 10:17:19.415567+00:00, data_interval_end=2023-10-31 10:17:19.415567+00:00, dag_hash=ecb0232f1acdec9dce7b278087864bff [2023-10-31T10:17:21.954+0000] {scheduler_job_runner.py:691} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1) [2023-10-31T10:17:21.955+0000] {scheduler_job_runner.py:728} INFO - TaskInstance Finished: dag_id=test_docker_task_error, task_id=pickle_error, run_id=manual__2023-10-31T10:17:19.415567+00:00, map_index=-1, run_start_date=2023-10-31 10:17:21.098075+00:00, run_end_date=2023-10-31 10:17:21.852766+00:00, run_duration=0.754691, state=success, executor_state=success, try_number=1, max_tries=0, job_id=23, pool=default_pool, queue=default, priority_weight=1, operator=_DockerDecoratedOperator, queued_dttm=2023-10-31 10:17:20.889633+00:00, queued_by_job_id=21, pid=521 [2023-10-31 10:18:35 +0000] [369] [INFO] Handling signal: winch ``` --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org