phi-friday opened a new pull request, #35293:
URL: https://github.com/apache/airflow/pull/35293

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   <!-- Please keep an empty line above the dashes. -->
   
   https://github.com/apache/airflow/discussions/35285
   
   Changed the existing `pickling_library` attribute to property and added 
`use_dill` attribute instead, as the property that `_DockerDecoratedOperator` 
has is not serializable and causes an error in the scheduler.
   
   ### origin
   ```log
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   [2023-10-31T10:21:35.996+0000] {executor_loader.py:117} INFO - Loaded 
executor: LocalExecutor
   [2023-10-31 10:21:36 +0000] [140] [INFO] Starting gunicorn 21.2.0
   [2023-10-31 10:21:36 +0000] [140] [INFO] Listening at: http://[::]:8793 (140)
   [2023-10-31 10:21:36 +0000] [140] [INFO] Using worker: sync
   [2023-10-31T10:21:36.038+0000] {scheduler_job_runner.py:803} INFO - Starting 
the scheduler
   [2023-10-31T10:21:36.039+0000] {scheduler_job_runner.py:810} INFO - 
Processing each file at most -1 times
   [2023-10-31 10:21:36 +0000] [141] [INFO] Booting worker with pid: 141
   [2023-10-31 10:21:36 +0000] [148] [INFO] Booting worker with pid: 148
   [2023-10-31T10:21:36.112+0000] {manager.py:169} INFO - Launched 
DagFileProcessorManager with pid: 193
   [2023-10-31T10:21:36.124+0000] {scheduler_job_runner.py:1611} INFO - 
Adopting or resetting orphaned tasks for active dag runs
   [2023-10-31T10:21:36.203+0000] {settings.py:61} INFO - Configured default 
timezone Timezone('UTC')
   [2023-10-31T10:21:51.192+0000] {scheduler_job_runner.py:419} INFO - 1 tasks 
up for execution:
           <TaskInstance: test_docker_task_error.no_error 
manual__2023-10-31T10:21:51.102283+00:00 [scheduled]>
   [2023-10-31T10:21:51.192+0000] {scheduler_job_runner.py:482} INFO - DAG 
test_docker_task_error has 0/16 running and queued tasks
   [2023-10-31T10:21:51.192+0000] {scheduler_job_runner.py:598} INFO - Setting 
the following tasks to queued state:
           <TaskInstance: test_docker_task_error.no_error 
manual__2023-10-31T10:21:51.102283+00:00 [scheduled]>
   [2023-10-31T10:21:51.193+0000] {taskinstance.py:2177} WARNING - cannot 
record scheduled_duration for task no_error because previous state change time 
has not been saved
   [2023-10-31T10:21:51.194+0000] {scheduler_job_runner.py:641} INFO - Sending 
TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', 
run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1) 
to executor with priority 2 and queue default
   [2023-10-31T10:21:51.194+0000] {base_executor.py:146} INFO - Adding to 
queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'no_error', 
'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 
'DAGS_FOLDER/test_dag.py']
   [2023-10-31T10:21:51.195+0000] {local_executor.py:89} INFO - 
QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 
'no_error', 'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 
'DAGS_FOLDER/test_dag.py']
   [2023-10-31T10:21:51.229+0000] {dagbag.py:538} INFO - Filling up the DagBag 
from /files/dags/test_dag.py
   Changing 
/root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:21:51.102283+00:00/task_id=no_error
 permission to 509
   [2023-10-31T10:21:51.466+0000] {task_command.py:421} INFO - Running 
<TaskInstance: test_docker_task_error.no_error 
manual__2023-10-31T10:21:51.102283+00:00 [queued]> on host ef12b15f26fd
   [2023-10-31T10:21:51.694+0000] {local_executor.py:138} ERROR - Failed to 
execute task cannot pickle 'module' object.
   Traceback (most recent call last):
     File "/opt/airflow/airflow/executors/local_executor.py", line 134, in 
_execute_work_in_fork
       args.func(args)
     File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/cli.py", line 114, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 436, in 
task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 214, in 
_run_task_by_selected_method
       return _run_task_by_local_task_job(args, ti)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 276, in 
_run_task_by_local_task_job
       ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/jobs/job.py", line 393, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File "/opt/airflow/airflow/jobs/job.py", line 422, in execute_job
       ret = execute_callable()
     File "/opt/airflow/airflow/jobs/local_task_job_runner.py", line 197, in 
_execute
       self.handle_task_exit(return_code)
     File "/opt/airflow/airflow/jobs/local_task_job_runner.py", line 237, in 
handle_task_exit
       
self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query)
     File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 3162, in 
schedule_downstream_tasks
       partial_dag = task.dag.partial_subset(
     File "/opt/airflow/airflow/models/dag.py", line 2476, in partial_subset
       dag.task_dict = {
     File "/opt/airflow/airflow/models/dag.py", line 2477, in <dictcomp>
       t.task_id: _deepcopy_task(t)
     File "/opt/airflow/airflow/models/dag.py", line 2474, in _deepcopy_task
       return copy.deepcopy(t, memo)
     File "/usr/local/lib/python3.8/copy.py", line 153, in deepcopy
       y = copier(memo)
     File "/opt/airflow/airflow/models/baseoperator.py", line 1215, in 
__deepcopy__
       setattr(result, k, copy.deepcopy(v, memo))
     File "/usr/local/lib/python3.8/copy.py", line 161, in deepcopy
       rv = reductor(4)
   TypeError: cannot pickle 'module' object
   [2023-10-31T10:21:52.290+0000] {scheduler_job_runner.py:419} INFO - 1 tasks 
up for execution:
           <TaskInstance: test_docker_task_error.pickle_error 
manual__2023-10-31T10:21:51.102283+00:00 [scheduled]>
   [2023-10-31T10:21:52.290+0000] {scheduler_job_runner.py:482} INFO - DAG 
test_docker_task_error has 0/16 running and queued tasks
   [2023-10-31T10:21:52.290+0000] {scheduler_job_runner.py:598} INFO - Setting 
the following tasks to queued state:
           <TaskInstance: test_docker_task_error.pickle_error 
manual__2023-10-31T10:21:51.102283+00:00 [scheduled]>
   [2023-10-31T10:21:52.292+0000] {taskinstance.py:2177} WARNING - cannot 
record scheduled_duration for task pickle_error because previous state change 
time has not been saved
   [2023-10-31T10:21:52.292+0000] {scheduler_job_runner.py:641} INFO - Sending 
TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', 
run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1) 
to executor with priority 1 and queue default
   [2023-10-31T10:21:52.293+0000] {base_executor.py:146} INFO - Adding to 
queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 
'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 
'DAGS_FOLDER/test_dag.py']
   [2023-10-31T10:21:52.295+0000] {local_executor.py:89} INFO - 
QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 
'pickle_error', 'manual__2023-10-31T10:21:51.102283+00:00', '--local', 
'--subdir', 'DAGS_FOLDER/test_dag.py']
   [2023-10-31T10:21:52.298+0000] {scheduler_job_runner.py:691} INFO - Received 
executor event with state failed for task instance 
TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', 
run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1)
   [2023-10-31T10:21:52.305+0000] {scheduler_job_runner.py:728} INFO - 
TaskInstance Finished: dag_id=test_docker_task_error, task_id=no_error, 
run_id=manual__2023-10-31T10:21:51.102283+00:00, map_index=-1, 
run_start_date=2023-10-31 10:21:51.573970+00:00, run_end_date=2023-10-31 
10:21:51.655716+00:00, run_duration=0.081746, state=success, 
executor_state=failed, try_number=1, max_tries=0, job_id=27, pool=default_pool, 
queue=default, priority_weight=2, operator=_PythonDecoratedOperator, 
queued_dttm=2023-10-31 10:21:51.193015+00:00, queued_by_job_id=26, pid=300
   [2023-10-31T10:21:52.330+0000] {dagbag.py:538} INFO - Filling up the DagBag 
from /files/dags/test_dag.py
   Changing 
/root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:21:51.102283+00:00/task_id=pickle_error
 permission to 509
   [2023-10-31T10:21:52.452+0000] {task_command.py:421} INFO - Running 
<TaskInstance: test_docker_task_error.pickle_error 
manual__2023-10-31T10:21:51.102283+00:00 [queued]> on host ef12b15f26fd
   [2023-10-31T10:21:53.357+0000] {dagrun.py:729} INFO - Marking run <DagRun 
test_docker_task_error @ 2023-10-31 10:21:51.102283+00:00: 
manual__2023-10-31T10:21:51.102283+00:00, state:running, queued_at: 2023-10-31 
10:21:51.110909+00:00. externally triggered: True> successful
   [2023-10-31T10:21:53.357+0000] {dagrun.py:780} INFO - DagRun Finished: 
dag_id=test_docker_task_error, execution_date=2023-10-31 10:21:51.102283+00:00, 
run_id=manual__2023-10-31T10:21:51.102283+00:00, run_start_date=2023-10-31 
10:21:51.176241+00:00, run_end_date=2023-10-31 10:21:53.357481+00:00, 
run_duration=2.18124, state=success, external_trigger=True, run_type=manual, 
data_interval_start=2023-10-31 10:21:51.102283+00:00, 
data_interval_end=2023-10-31 10:21:51.102283+00:00, 
dag_hash=ecb0232f1acdec9dce7b278087864bff
   [2023-10-31T10:21:53.362+0000] {scheduler_job_runner.py:691} INFO - Received 
executor event with state success for task instance 
TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', 
run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1)
   [2023-10-31T10:21:53.364+0000] {scheduler_job_runner.py:728} INFO - 
TaskInstance Finished: dag_id=test_docker_task_error, task_id=pickle_error, 
run_id=manual__2023-10-31T10:21:51.102283+00:00, map_index=-1, 
run_start_date=2023-10-31 10:21:52.502395+00:00, run_end_date=2023-10-31 
10:21:53.228269+00:00, run_duration=0.725874, state=success, 
executor_state=success, try_number=1, max_tries=0, job_id=28, 
pool=default_pool, queue=default, priority_weight=1, 
operator=_DockerDecoratedOperator, queued_dttm=2023-10-31 
10:21:52.291449+00:00, queued_by_job_id=26, pid=302
   ```
   
   ### new
   ```log
   [2023-10-31T10:17:12.335+0000] {executor_loader.py:117} INFO - Loaded 
executor: LocalExecutor
   [2023-10-31 10:17:12 +0000] [369] [INFO] Starting gunicorn 21.2.0
   [2023-10-31 10:17:12 +0000] [369] [INFO] Listening at: http://[::]:8793 (369)
   [2023-10-31 10:17:12 +0000] [369] [INFO] Using worker: sync
   [2023-10-31 10:17:12 +0000] [370] [INFO] Booting worker with pid: 370
   [2023-10-31T10:17:12.363+0000] {scheduler_job_runner.py:803} INFO - Starting 
the scheduler
   [2023-10-31T10:17:12.363+0000] {scheduler_job_runner.py:810} INFO - 
Processing each file at most -1 times
   [2023-10-31T10:17:12.411+0000] {manager.py:169} INFO - Launched 
DagFileProcessorManager with pid: 419
   [2023-10-31T10:17:12.412+0000] {scheduler_job_runner.py:1611} INFO - 
Adopting or resetting orphaned tasks for active dag runs
   [2023-10-31 10:17:12 +0000] [481] [INFO] Booting worker with pid: 481
   [2023-10-31T10:17:12.466+0000] {settings.py:61} INFO - Configured default 
timezone Timezone('UTC')
   [2023-10-31T10:17:19.805+0000] {scheduler_job_runner.py:419} INFO - 1 tasks 
up for execution:
           <TaskInstance: test_docker_task_error.no_error 
manual__2023-10-31T10:17:19.415567+00:00 [scheduled]>
   [2023-10-31T10:17:19.805+0000] {scheduler_job_runner.py:482} INFO - DAG 
test_docker_task_error has 0/16 running and queued tasks
   [2023-10-31T10:17:19.805+0000] {scheduler_job_runner.py:598} INFO - Setting 
the following tasks to queued state:
           <TaskInstance: test_docker_task_error.no_error 
manual__2023-10-31T10:17:19.415567+00:00 [scheduled]>
   [2023-10-31T10:17:19.806+0000] {taskinstance.py:2177} WARNING - cannot 
record scheduled_duration for task no_error because previous state change time 
has not been saved
   [2023-10-31T10:17:19.806+0000] {scheduler_job_runner.py:641} INFO - Sending 
TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', 
run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1) 
to executor with priority 2 and queue default
   [2023-10-31T10:17:19.806+0000] {base_executor.py:146} INFO - Adding to 
queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'no_error', 
'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 
'DAGS_FOLDER/test_dag.py']
   [2023-10-31T10:17:19.807+0000] {local_executor.py:89} INFO - 
QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 
'no_error', 'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 
'DAGS_FOLDER/test_dag.py']
   [2023-10-31T10:17:19.833+0000] {dagbag.py:538} INFO - Filling up the DagBag 
from /files/dags/test_dag.py
   Changing 
/root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:17:19.415567+00:00/task_id=no_error
 permission to 509
   [2023-10-31T10:17:20.003+0000] {task_command.py:421} INFO - Running 
<TaskInstance: test_docker_task_error.no_error 
manual__2023-10-31T10:17:19.415567+00:00 [queued]> on host c482888bf857
   [2023-10-31T10:17:20.888+0000] {scheduler_job_runner.py:419} INFO - 1 tasks 
up for execution:
           <TaskInstance: test_docker_task_error.pickle_error 
manual__2023-10-31T10:17:19.415567+00:00 [scheduled]>
   [2023-10-31T10:17:20.889+0000] {scheduler_job_runner.py:482} INFO - DAG 
test_docker_task_error has 0/16 running and queued tasks
   [2023-10-31T10:17:20.889+0000] {scheduler_job_runner.py:598} INFO - Setting 
the following tasks to queued state:
           <TaskInstance: test_docker_task_error.pickle_error 
manual__2023-10-31T10:17:19.415567+00:00 [scheduled]>
   [2023-10-31T10:17:20.890+0000] {taskinstance.py:2177} WARNING - cannot 
record scheduled_duration for task pickle_error because previous state change 
time has not been saved
   [2023-10-31T10:17:20.890+0000] {scheduler_job_runner.py:641} INFO - Sending 
TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', 
run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1) 
to executor with priority 1 and queue default
   [2023-10-31T10:17:20.891+0000] {base_executor.py:146} INFO - Adding to 
queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 
'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 
'DAGS_FOLDER/test_dag.py']
   [2023-10-31T10:17:20.892+0000] {local_executor.py:89} INFO - 
QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 
'pickle_error', 'manual__2023-10-31T10:17:19.415567+00:00', '--local', 
'--subdir', 'DAGS_FOLDER/test_dag.py']
   [2023-10-31T10:17:20.895+0000] {scheduler_job_runner.py:691} INFO - Received 
executor event with state success for task instance 
TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', 
run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1)
   [2023-10-31T10:17:20.900+0000] {scheduler_job_runner.py:728} INFO - 
TaskInstance Finished: dag_id=test_docker_task_error, task_id=no_error, 
run_id=manual__2023-10-31T10:17:19.415567+00:00, map_index=-1, 
run_start_date=2023-10-31 10:17:20.059029+00:00, run_end_date=2023-10-31 
10:17:20.140250+00:00, run_duration=0.081221, state=success, 
executor_state=success, try_number=1, max_tries=0, job_id=22, 
pool=default_pool, queue=default, priority_weight=2, 
operator=_PythonDecoratedOperator, queued_dttm=2023-10-31 
10:17:19.805657+00:00, queued_by_job_id=21, pid=519
   [2023-10-31T10:17:20.924+0000] {dagbag.py:538} INFO - Filling up the DagBag 
from /files/dags/test_dag.py
   Changing 
/root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:17:19.415567+00:00/task_id=pickle_error
 permission to 509
   [2023-10-31T10:17:21.046+0000] {task_command.py:421} INFO - Running 
<TaskInstance: test_docker_task_error.pickle_error 
manual__2023-10-31T10:17:19.415567+00:00 [queued]> on host c482888bf857
   [2023-10-31T10:17:21.949+0000] {dagrun.py:729} INFO - Marking run <DagRun 
test_docker_task_error @ 2023-10-31 10:17:19.415567+00:00: 
manual__2023-10-31T10:17:19.415567+00:00, state:running, queued_at: 2023-10-31 
10:17:19.429351+00:00. externally triggered: True> successful
   [2023-10-31T10:17:21.950+0000] {dagrun.py:780} INFO - DagRun Finished: 
dag_id=test_docker_task_error, execution_date=2023-10-31 10:17:19.415567+00:00, 
run_id=manual__2023-10-31T10:17:19.415567+00:00, run_start_date=2023-10-31 
10:17:19.789176+00:00, run_end_date=2023-10-31 10:17:21.950050+00:00, 
run_duration=2.160874, state=success, external_trigger=True, run_type=manual, 
data_interval_start=2023-10-31 10:17:19.415567+00:00, 
data_interval_end=2023-10-31 10:17:19.415567+00:00, 
dag_hash=ecb0232f1acdec9dce7b278087864bff
   [2023-10-31T10:17:21.954+0000] {scheduler_job_runner.py:691} INFO - Received 
executor event with state success for task instance 
TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', 
run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1)
   [2023-10-31T10:17:21.955+0000] {scheduler_job_runner.py:728} INFO - 
TaskInstance Finished: dag_id=test_docker_task_error, task_id=pickle_error, 
run_id=manual__2023-10-31T10:17:19.415567+00:00, map_index=-1, 
run_start_date=2023-10-31 10:17:21.098075+00:00, run_end_date=2023-10-31 
10:17:21.852766+00:00, run_duration=0.754691, state=success, 
executor_state=success, try_number=1, max_tries=0, job_id=23, 
pool=default_pool, queue=default, priority_weight=1, 
operator=_DockerDecoratedOperator, queued_dttm=2023-10-31 
10:17:20.889633+00:00, queued_by_job_id=21, pid=521
   [2023-10-31 10:18:35 +0000] [369] [INFO] Handling signal: winch
   ```
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to