amoghrajesh commented on PR #51716:
URL: https://github.com/apache/airflow/pull/51716#issuecomment-2972751958

   @jscheffl @dheerajturaga from what i see in the edge executor interface, the 
edge_command is nothing but a model dump of `workload`: 
https://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py#L169
   
   
   I tried a small experiment
   Created a simple dag and ran it couple of times with retries defined.
   
   ```
   from airflow import DAG
   from datetime import datetime, timedelta
   from airflow.providers.standard.operators.bash import BashOperator
   from airflow.providers.standard.operators.python import PythonOperator
   
   
   def flaky_task(**context):
       y = 1//0
   
   with DAG(
       dag_id="retry_example_dag",
       start_date=datetime(2025, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
   
       retrying_task = PythonOperator(
           task_id="flaky_task",
           python_callable=flaky_task,
           retries=5,
           retry_delay=timedelta(seconds=5),
       )
   
   ```
   
   (Made some tweaks so it prints the size of the `workload.model_dump_json()` 
which is `command` here)
   
   
   ```
   2025-06-14 12:50:54.751778 [info     ] Task 
execute_workload[a7517e3a-e357-4bf4-8519-841aae0fa0aa] received 
[celery.worker.strategy]
   2025-06-14 12:50:54.897616 [info     ] 
[a7517e3a-e357-4bf4-8519-841aae0fa0aa] Executing workload in Celery: 
token='eyJ***' ti=TaskInstance(id=UUID('01976e7e-34bf-70b1-84cc-284ae6aff602'), 
task_id='flaky_task', dag_id='retry_example_dag', 
run_id='manual__2025-06-14T12:50:54.260989+00:00', try_number=1, map_index=-1, 
pool_slots=1, queue='default', priority_weight=1, executor_config=None, 
parent_context_carrier={}, context_carrier={}, queued_dttm=None) 
dag_rel_path=PurePosixPath('retry-with-delay.py') 
bundle_info=BundleInfo(name='dags-folder', version=None) 
log_path='dag_id=retry_example_dag/run_id=manual__2025-06-14T12:50:54.260989+00:00/task_id=flaky_task/attempt=1.log'
 type='ExecuteTask', len of workload is: 903 
[airflow.providers.celery.executors.celery_executor_utils]
   2025-06-14 12:50:54.915904 [info     ] Secrets backends loaded for worker 
[supervisor] backend_classes=['LocalFilesystemBackend', 
'EnvironmentVariablesBackend'] count=2
   2025-06-14 12:50:55.545694 [info     ] Task finished                  
[supervisor] duration=0.6320609190006508 exit_code=0 final_state=up_for_retry
   2025-06-14 12:50:55.550448 [info     ] Task 
execute_workload[a7517e3a-e357-4bf4-8519-841aae0fa0aa] succeeded in 
0.7965272139990702s: None [celery.app.trace]
   ^L[2025-06-14 12:50:57 +0000] [665] [INFO] Handling signal: winch
   2025-06-14 12:51:01.275889 [info     ] Task 
execute_workload[241e8614-d7dd-4de9-b901-32195c238f65] received 
[celery.worker.strategy]
   2025-06-14 12:51:01.280838 [info     ] 
[241e8614-d7dd-4de9-b901-32195c238f65] Executing workload in Celery: 
token='eyJ***' ti=TaskInstance(id=UUID('01976e7e-39b3-754c-ae51-ce764bb98068'), 
task_id='flaky_task', dag_id='retry_example_dag', 
run_id='manual__2025-06-14T12:50:54.260989+00:00', try_number=2, map_index=-1, 
pool_slots=1, queue='default', priority_weight=1, executor_config=None, 
parent_context_carrier={}, context_carrier={}, 
queued_dttm=datetime.datetime(2025, 6, 14, 12, 50, 54, 624148, 
tzinfo=TzInfo(UTC))) dag_rel_path=PurePosixPath('retry-with-delay.py') 
bundle_info=BundleInfo(name='dags-folder', version=None) 
log_path='dag_id=retry_example_dag/run_id=manual__2025-06-14T12:50:54.260989+00:00/task_id=flaky_task/attempt=2.log'
 type='ExecuteTask', len of workload is: 928 
[airflow.providers.celery.executors.celery_executor_utils]
   2025-06-14 12:51:01.295521 [info     ] Secrets backends loaded for worker 
[supervisor] backend_classes=['LocalFilesystemBackend', 
'EnvironmentVariablesBackend'] count=2
   2025-06-14 12:51:01.453121 [info     ] Task finished                  
[supervisor] duration=0.15831252200587187 exit_code=0 final_state=up_for_retry
   2025-06-14 12:51:01.458458 [info     ] Task 
execute_workload[241e8614-d7dd-4de9-b901-32195c238f65] succeeded in 
0.18147331300133374s: None [celery.app.trace]
   2025-06-14 12:51:06.603739 [info     ] Task 
execute_workload[7b287860-ad2c-4fc9-9a03-90f429aa3666] received 
[celery.worker.strategy]
   2025-06-14 12:51:06.609338 [info     ] 
[7b287860-ad2c-4fc9-9a03-90f429aa3666] Executing workload in Celery: 
token='eyJ***' ti=TaskInstance(id=UUID('01976e7e-50c7-7f57-a3da-d326eefbde78'), 
task_id='flaky_task', dag_id='retry_example_dag', 
run_id='manual__2025-06-14T12:50:54.260989+00:00', try_number=3, map_index=-1, 
pool_slots=1, queue='default', priority_weight=1, executor_config=None, 
parent_context_carrier={}, context_carrier={}, 
queued_dttm=datetime.datetime(2025, 6, 14, 12, 51, 1, 270429, 
tzinfo=TzInfo(UTC))) dag_rel_path=PurePosixPath('retry-with-delay.py') 
bundle_info=BundleInfo(name='dags-folder', version=None) 
log_path='dag_id=retry_example_dag/run_id=manual__2025-06-14T12:50:54.260989+00:00/task_id=flaky_task/attempt=3.log'
 type='ExecuteTask', len of workload is: 928 
[airflow.providers.celery.executors.celery_executor_utils]
   2025-06-14 12:51:06.624493 [info     ] Secrets backends loaded for worker 
[supervisor] backend_classes=['LocalFilesystemBackend', 
'EnvironmentVariablesBackend'] count=2
   2025-06-14 12:51:06.758592 [info     ] Task finished                  
[supervisor] duration=0.13502606399561046 exit_code=0 final_state=up_for_retry
   2025-06-14 12:51:06.762165 [info     ] Task 
execute_workload[7b287860-ad2c-4fc9-9a03-90f429aa3666] succeeded in 
0.15771018699888373s: None [celery.app.trace]
   2025-06-14 12:51:12.682274 [info     ] Task 
execute_workload[c2480fb0-ba96-4bbd-b5eb-0e74cd2641cc] received 
[celery.worker.strategy]
   2025-06-14 12:51:12.686425 [info     ] 
[c2480fb0-ba96-4bbd-b5eb-0e74cd2641cc] Executing workload in Celery: 
token='eyJ***' ti=TaskInstance(id=UUID('01976e7e-6580-7a9c-b89b-7b016c42fca4'), 
task_id='flaky_task', dag_id='retry_example_dag', 
run_id='manual__2025-06-14T12:50:54.260989+00:00', try_number=4, map_index=-1, 
pool_slots=1, queue='default', priority_weight=1, executor_config=None, 
parent_context_carrier={}, context_carrier={}, 
queued_dttm=datetime.datetime(2025, 6, 14, 12, 51, 6, 599630, 
tzinfo=TzInfo(UTC))) dag_rel_path=PurePosixPath('retry-with-delay.py') 
bundle_info=BundleInfo(name='dags-folder', version=None) 
log_path='dag_id=retry_example_dag/run_id=manual__2025-06-14T12:50:54.260989+00:00/task_id=flaky_task/attempt=4.log'
 type='ExecuteTask', len of workload is: 928 
[airflow.providers.celery.executors.celery_executor_utils
   ```
   
   So i can see in normal cases, its 903 and then 928.
   
   Considering that we can max out some fields in length like:
        •       task_id can be up to 36 characters
        •       dag_id can be up to 250 characters
        •       Other fields (e.g., run_id, queue, log_path, executor_config, 
etc.) add more
   
   I think 2048 would be a good length specifier here. I do not see a situation 
where it could exceed beyond that and anything shorter could be hit real soon 
and become limiting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to