george-zubrienko commented on issue #30884:
URL: https://github.com/apache/airflow/issues/30884#issuecomment-1523857119
Combining answers here - I'll try with 2.6.0rc or maybe released version if
it gets released before I get an hour to do a test.
> @george-zubrienko - I have some extra questions that might help to find
out the root cause.
>
> 1. Does it happen all the time for all dags parsed or some of the time for
some of the dags? Can you please descibe the pattern you see?
I can pinpoint a few things:
- All dags parse much slower than usual
- Increasing DAG_FILE_PROCESSOR_TIMEOUT doesn't affect anything
- Increasing number of parsing processes helps to get number of
parsed/active dags up, but overall process goes very slow doing maybe one dag
every 30s or so with 128 parsing processes. Initially we see around 20-30 dags
going through
- There doesn't see to be any pattern related to the dag structure. Dags
with a single task or 100 tasks can fail to process.
> 2. Could you please tell more (and possibly share some anonymised examples
of) top-level code of the dags of yours that experience this behaviour and
whether you use some of those things:
> * excessive or unusual imports?
We always try import only what we need in DAG files, but sometimes there can
be imports for typehinting purposes only.
> * reaching out to external sources (HTTP/Similar) while parsing top-level
code?
No, never.
> * accessing any kind of database while parsing top-level code?
Infamous `Variabels.get(..)` is one thing that is against Airflow
best-practices that we use in all dags. However, our Airflow is set to only run
a parse for every 30s instead of 5s, and we only read a single variable
(top-level json configuration for the pipeline)
In some dags we also read 1-2 connections stored in Airflow db. That's it.
> * acxessing secrets/variables/connections while parsing top-level code?
See notes above.
> 3. Do you see any excessive memory use and swapping while parsing happens
for the DAG file processor?
Memory usage on dag file processor pod starts at around 180mb and grows to
around 1.6gb on initial parse and then stabilizes around 700mb and cpu usage
drops to ~2-3% from initial ~50%
> 4. Do you use callbacks in your DAGs that experience the problem?
No, we also have SLA and email/error callbacks fully disabled.
Please find below a full example of DAG that consists of 1 task only:
```
"""Manages stream lifecycle for OC logs"""
import json
from datetime import timedelta
import pendulum
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.datadog.hooks.datadog import DatadogHook
from esd_services_api_client.beast import ArgumentValue, JobSocket
from ecco_airflow.dags.omnichannel.base import activate_log_stream
from ecco_airflow.utils.k8s import executor_config
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="ocLogStream",
default_args=default_args,
description="Activates streaming of OC logs.",
schedule_interval="0 * * * *",
max_active_runs=1,
start_date=pendulum.today().add(days=-2),
tags=["streaming", "omni_channel", "reporting"],
) as dag:
# pylint: disable=W0104
dd_hook = DatadogHook(datadog_conn_id="esd-datadog")
runtime_conf = Variable.get("omni_channel_logs", deserialize_json=True)
PythonOperator(
task_id="stream-oc-logs",
python_callable=activate_log_stream,
pool="omnichannel",
op_kwargs={
"checkpoint_location":
f"'{runtime_conf['checkpoint_location']}'",
"compact_after": f"'{runtime_conf['compact_after']}'",
"datadog_config": ArgumentValue(
value=json.dumps(
{
"api_key": dd_hook.api_key,
"app_key": dd_hook.app_key,
"site": "datadoghq.eu",
}
),
encrypt=True,
quote=True,
),
"custom_config": ArgumentValue(
value=runtime_conf["access_credentials"],
encrypt=True,
quote=True,
is_env=True,
),
"project_name": runtime_conf["project_name"],
"project_version": runtime_conf["project_version"],
"project_runnable": runtime_conf["project_runnable"],
"stream_group": "oc_logs",
"inputs": [JobSocket.from_dict(src) for src in
runtime_conf["sources"]],
"outputs": [JobSocket.from_dict(trg) for trg in
runtime_conf["targets"]],
},
executor_config=executor_config(
nodepool_names=["general"],
secret_env_vars={
"RUNTIME_ENCRYPTION_KEY": {
"secret_key": "RUNTIME_ENCRYPTION_KEY",
"secret_name": "hashicorp-vault-spark-encryption-key",
},
"OC_SPARK_CRYSTAL_ACCOUNT_ACCESS": {
"secret_key": "OC_SPARK_CRYSTAL_ACCOUNT_ACCESS",
"secret_name": "hashicorp-vault-oc-logs",
},
},
cpu_memory_limit={"cpu": "100m", "memory": "500Mi"},
),
retries=3,
retry_delay=timedelta(seconds=300),
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]