heavenlxj opened a new issue, #26988:
URL: https://github.com/apache/airflow/issues/26988

   ### Apache Airflow version
   
   Other Airflow 2 version
   
   ### What happened
   
   OS: CentOS Linux release 7.9.2009 (Core)
   K8S: 1.16
   Airflow: 2.2.1
   
   The Dag Code paste here:
   `
   import sys
   sys.path.insert(0, "/opt/bitnami/airflow/dags/scene4")
   
   import pendulum
   from airflow import DAG
   from datetime import datetime
   from airflow.sensors.external_task import ExternalTaskSensor
   from airflow.operators.dummy import DummyOperator
   from airflow.utils.dates import timedelta
   from scene4.data_mock.run_env import gen_data
   from airflow.operators.python import PythonOperator
   
   local_tz = pendulum.timezone("Asia/Shanghai")
   
   with DAG(
       dag_id="DAG_4_31",
       start_date=datetime(2022, 10, 10, tzinfo=local_tz),
       schedule_interval=None,
       catchup=False,
       template_searchpath = "/opt/bitnami/airflow/dags/scene4",
       params= { "scene_id": "4", "workspace_id": "1"}
   ) as dag:
       task_start = DummyOperator(
           task_id="task_start"
       )
       gen_data_task = PythonOperator (
           task_id  = "gen_data_task",
           provide_context = True,
           python_callable = gen_data,
           retries = 10,
           retry_delay = timedelta(seconds = 20),
           execution_timeout = timedelta(seconds = 30),
           trigger_rule = "all_success",
           op_kwargs = {
           }
       )
   
       task_start >> [gen_data_task]
   
   `
   
   in the run_env.py we import the python lib 'py_trees', and getting the error 
in the airflow worker as below:
   `
   [2022-10-10 17:32:35,132: ERROR/ForkPoolWorker-15] Failed to import: 
/opt/bitnami/airflow/dags/scene4/auto_gen/DAG_4_44.py
   Traceback (most recent call last):
     File 
"/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/models/dagbag.py",
 line 331, in _load_modules_from_file
       loader.exec_module(new_module)
     File "<frozen importlib._bootstrap_external>", line 843, in exec_module
     File "<frozen importlib._bootstrap>", line 219, in 
_call_with_frames_removed
     File "/opt/bitnami/airflow/dags/scene4/auto_gen/DAG_4_44.py", line 14, in 
<module>
       from scene4.data_mock.run_env import main
     File "/opt/bitnami/airflow/dags/scene4/data_mock/run_env.py", line 11, in 
<module>
       from data_mock.env_aml import CustomEnv
     File "/opt/bitnami/airflow/dags/scene4/data_mock/env_aml.py", line 6, in 
<module>
       from data_mock.tree import BTConfigs, BehaviorTree
     File "/opt/bitnami/airflow/dags/scene4/data_mock/tree.py", line 7, in 
<module>
       import py_trees
     File 
"/opt/bitnami/airflow/venv/lib/python3.8/site-packages/py_trees/__init__.py", 
line 16, in <module>
       from . import behaviour  # noqa
     File 
"/opt/bitnami/airflow/venv/lib/python3.8/site-packages/py_trees/behaviour.py", 
line 23, in <module>
       from . import blackboard
     File 
"/opt/bitnami/airflow/venv/lib/python3.8/site-packages/py_trees/blackboard.py", 
line 73, in <module>
       from . import console
     File 
"/opt/bitnami/airflow/venv/lib/python3.8/site-packages/py_trees/console.py", 
line 44, in <module>
       def has_unicode(encoding: str=sys.stdout.encoding) -> bool:
   AttributeError: 'LoggingProxy' object has no attribute 'encoding'
   [2022-10-10 17:32:35,269: ERROR/ForkPoolWorker-15] Failed to execute task 
dag_id could not be found: DAG_4_44. Either the dag did not exist or it failed 
to parse..
   Traceback (most recent call last):
     File 
"/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
 line 121, in _execute_in_fork
       args.func(args)
     File 
"/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/cli/cli_parser.py",
 line 48, in command
       return func(*args, **kwargs)
     File 
"/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/utils/cli.py", 
line 92, in wrapper
       return f(*args, **kwargs)
     File 
"/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
 line 276, in task_run
       dag = get_dag(args.subdir, args.dag_id)
     File 
"/opt/bitnami/airflow/venv/lib/python3.8/site-packages/airflow/utils/cli.py", 
line 193, in get_dag
       raise AirflowException(
   airflow.exceptions.AirflowException: dag_id could not be found: DAG_4_44. 
Either the dag did not exist or it failed to parse.
   [2022-10-10 17:32:35,295: ERROR/ForkPoolWorker-15] Task 
airflow.executors.celery_executor.execute_command[4ce04924-18c1-4ad0-8025-1f78b4b1f6a1]
 raised unexpected: AirflowException('Celery command failed on host: 
airflow-worker-0.airflow-worker-hl.airflow.svc.cluster.local')
   `
   
   search some similar issue for this post about celery worker stdout:
   
https://docs.celeryq.dev/en/stable/userguide/configuration.html?highlight=REDIRECT_STDOUTS#std-setting-worker_redirect_stdouts
   
   it tells you can setting the celery config 'worker_redirect_stdouts' to 
False to avoid this error.
   
   what I did:
   modify the 'airflow.config_templates.default_celery.py', and add this config 
into DEFAULT_CELERY_CONFIG:
   `
   DEFAULT_CELERY_CONFIG = {
       'accept_content': ['json'],
       'event_serializer': 'json',
       'worker_prefetch_multiplier': conf.getint('celery', 
'worker_prefetch_multiplier', fallback=1),
       'task_acks_late': True,
       'task_default_queue': conf.get('operators', 'DEFAULT_QUEUE'),
       'task_default_exchange': conf.get('operators', 'DEFAULT_QUEUE'),
       'task_track_started': conf.get('celery', 'task_track_started', 
fallback=True),
       'broker_url': broker_url,
       'broker_transport_options': broker_transport_options,
       'result_backend': conf.get('celery', 'RESULT_BACKEND'),
       'worker_concurrency': conf.getint('celery', 'WORKER_CONCURRENCY'),
      ++  'worker_redirect_stdouts': False
   }
   `
   
   i see the airflow config inject the celery config in:
   <img width="813" alt="image" 
src="https://user-images.githubusercontent.com/4516605/195049341-e0a9e209-b963-42b0-a7dd-8f73fe9c507b.png";>
   
   so the problem is :
   1. is this a bug for airflow logging issue?
   2. if need to set the celery config, what is the right way?
        now i try to make this config effective doing that:
          1.  add a new celery_config.py with new configs
          2.  mount this py into worker pod as volume path
          3.  inject the k8s ENV for  'celery_config_options'  point to the 
celery_config file
           so should be do this properly?
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   CentOS Linux release 7.9.2009 (Core)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   bitnami:airflow. 2.2.1
   
   
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to