george-zubrienko commented on issue #30884:
URL: https://github.com/apache/airflow/issues/30884#issuecomment-1523857119

   Combining answers here - I'll try with 2.6.0rc or maybe released version if 
it gets released before I get an hour to do a test.
   
   > @george-zubrienko - I have some extra questions that might help to find 
out the root cause.
   > 
   > 1. Does it happen all the time for all dags parsed or some of the time for 
some of the dags? Can you please descibe the pattern you see?
   
   I can pinpoint a few things:
   
   - All dags parse much slower than usual
   - Increasing DAG_FILE_PROCESSOR_TIMEOUT doesn't affect anything
   - Increasing number of parsing processes helps to get number of 
parsed/active dags up, but overall process goes very slow doing maybe one dag 
every 30s or so with 128 parsing processes. Initially we see around 20-30 dags 
going through
   - There doesn't see to be any pattern related to the dag structure. Dags 
with a single task or 100 tasks can fail to process.
   
   > 2. Could you please tell more (and possibly share some anonymised examples 
of) top-level code of the dags of yours that experience this behaviour and 
whether you use some of those things:
   
   > * excessive or unusual imports?
   
   We always try import only what we need in DAG files, but sometimes there can 
be imports for typehinting purposes only.
   
   > * reaching out to external sources (HTTP/Similar) while parsing top-level 
code?
   
   No, never.
   
   > * accessing any kind of database while parsing top-level code?
   
   Infamous `Variabels.get(..)` is one thing that is against Airflow 
best-practices that we use in all dags. However, our Airflow is set to only run 
a parse for every 30s instead of 5s, and we only read a single variable 
(top-level json configuration for the pipeline)
   
   In some dags we also read 1-2 connections stored in Airflow db. That's it.
   
   > * acxessing secrets/variables/connections while parsing top-level code?
   
   See notes above.
   
   > 3. Do you see any excessive memory use and swapping while parsing happens 
for the DAG file processor?
   
   Memory usage on dag file processor pod starts at around 180mb and grows to 
around 1.6gb on initial parse and then stabilizes around 700mb and cpu usage 
drops to ~2-3% from initial ~50%
   
   > 4. Do you use callbacks in your DAGs that experience the problem?
   
   No, we also have SLA and email/error callbacks fully disabled.
   
   Please find below a full example of DAG that consists of 1 task only:
   
   ```
   """Manages stream lifecycle for OC logs"""
   import json
   from datetime import timedelta
   
   import pendulum
   from airflow import DAG
   from airflow.models import Variable
   
   from airflow.operators.python import PythonOperator
   from airflow.providers.datadog.hooks.datadog import DatadogHook
   
   from esd_services_api_client.beast import ArgumentValue, JobSocket
   
   from ecco_airflow.dags.omnichannel.base import activate_log_stream
   from ecco_airflow.utils.k8s import executor_config
   
   default_args = {
       "owner": "data-engineering",
       "depends_on_past": False,
       "email": ["[email protected]"],
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 1,
       "retry_delay": timedelta(minutes=5),
   }
   
   with DAG(
       dag_id="ocLogStream",
       default_args=default_args,
       description="Activates streaming of OC logs.",
       schedule_interval="0 * * * *",
       max_active_runs=1,
       start_date=pendulum.today().add(days=-2),
       tags=["streaming", "omni_channel", "reporting"],
   ) as dag:
       # pylint: disable=W0104
   
       dd_hook = DatadogHook(datadog_conn_id="esd-datadog")
       runtime_conf = Variable.get("omni_channel_logs", deserialize_json=True)
   
       PythonOperator(
           task_id="stream-oc-logs",
           python_callable=activate_log_stream,
           pool="omnichannel",
           op_kwargs={
               "checkpoint_location": 
f"'{runtime_conf['checkpoint_location']}'",
               "compact_after": f"'{runtime_conf['compact_after']}'",
               "datadog_config": ArgumentValue(
                   value=json.dumps(
                       {
                           "api_key": dd_hook.api_key,
                           "app_key": dd_hook.app_key,
                           "site": "datadoghq.eu",
                       }
                   ),
                   encrypt=True,
                   quote=True,
               ),
               "custom_config": ArgumentValue(
                   value=runtime_conf["access_credentials"],
                   encrypt=True,
                   quote=True,
                   is_env=True,
               ),
               "project_name": runtime_conf["project_name"],
               "project_version": runtime_conf["project_version"],
               "project_runnable": runtime_conf["project_runnable"],
               "stream_group": "oc_logs",
               "inputs": [JobSocket.from_dict(src) for src in 
runtime_conf["sources"]],
               "outputs": [JobSocket.from_dict(trg) for trg in 
runtime_conf["targets"]],
           },
           executor_config=executor_config(
               nodepool_names=["general"],
               secret_env_vars={
                   "RUNTIME_ENCRYPTION_KEY": {
                       "secret_key": "RUNTIME_ENCRYPTION_KEY",
                       "secret_name": "hashicorp-vault-spark-encryption-key",
                   },
                   "OC_SPARK_CRYSTAL_ACCOUNT_ACCESS": {
                       "secret_key": "OC_SPARK_CRYSTAL_ACCOUNT_ACCESS",
                       "secret_name": "hashicorp-vault-oc-logs",
                   },
               },
               cpu_memory_limit={"cpu": "100m", "memory": "500Mi"},
           ),
           retries=3,
           retry_delay=timedelta(seconds=300),
       )
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to