kaxil opened a new pull request, #54814:
URL: https://github.com/apache/airflow/pull/54814

   The MySQL UUID v7 generation function in migration 0042 was creating 
malformed UUIDs that fail Pydantic validation when the scheduler attempts to 
enqueue task instances.
   
   ## Problem
   
   The original MySQL function had two critical issues:
   1. Generated only 16 random hex characters instead of the required 20
   2. Used `SUBSTRING(rand_hex, 9)` without length limit, producing 8-character 
final segments instead of the required 12 characters
   
   This resulted in malformed UUIDs like:
   - Bad:  `0198cf6d-fb98-4555-7301-e29b8403` (32 chars, last segment: 8 chars)
   - Good: `0198cf6d-fb98-4555-7301-e29b8403abcd` (36 chars, last segment: 12 
chars)
   
   ## When This Issue Occurs
   
   The validation error happens when:
   1. Task instances exist in 'scheduled' state before migrating from 2.10 to 
3.0.x
   2. These tasks receive malformed UUIDs during migration
   3. Scheduler tries to enqueue these tasks via `ExecuteTask.make()`
   4. Pydantic validation fails: `'invalid group length in group 4: expected 
12, found 8'`
   
   Users with no scheduled tasks during migration or who create new DAG runs 
typically don't encounter this issue since new task instances get proper UUIDs 
from the Python `uuid7()` function.
   
   ## Solution
   
   Updated the MySQL `uuid_generate_v7` function to:
   - Use `RANDOM_BYTES(10)` for cryptographically secure 20-character hex data
   - Apply explicit `SUBSTRING(rand_hex, 9, 12)` to ensure 12-character final 
segment
   - Mark function as `NOT DETERMINISTIC` (correct for random functions)
   - Use `CHAR(20)` declaration matching actual usage
   
   ## Why No New Data Migration
   
   I decided against creating a separate migration to fix existing malformed 
UUIDs because:
   
   1. **Limited scope** - Only affects task instances in 'scheduled' state 
during migration
   2. **Self-healing** - System recovers as old tasks complete and new ones are 
created
   3. **Risk mitigation** - Avoid complex primary key modifications in 
production
   4. **Alternative available** - Manual fix script provided below for affected 
users
   5. **Prevention focus** - Fixing root cause prevents future occurrences
   
   ## Manual Fix for Affected Users
   
   If you encounter the UUID validation error, you can fix existing malformed 
UUIDs:
   
   ```sql
   -- Fix malformed UUIDs by extending them to proper length
   UPDATE task_instance
   SET id = CONCAT(
       SUBSTRING(id, 1, 23),  -- Keep first 23 chars (including last dash)
       LPAD(HEX(FLOOR(RAND() * POW(2,32))), 8, '0')  -- Add 8 random hex chars
   )
   WHERE LENGTH(SUBSTRING_INDEX(id, '-', -1)) = 8;  -- Find 8-char final 
segments
   
   -- Verify the fix
   SELECT id, LENGTH(id) as uuid_length,
          LENGTH(SUBSTRING_INDEX(id, '-', -1)) as last_segment_length
   FROM task_instance
   WHERE LENGTH(SUBSTRING_INDEX(id, '-', -1)) != 12
   LIMIT 5;
   ```
   
   ## Testing
   
   **Error in Scheduler**:
   
   ```
   [2025-08-22T02:17:43.203+0000] {scheduler_job_runner.py:710} INFO - Trying 
to enqueue tasks: [<TaskInstance: as.simplest_dag 
manual__2025-08-22T01:39:07.403055+00:00 [scheduled]>, <TaskInstance: 
as.simplest_dag scheduled__2025-08-22T02:15:00+00:00 [scheduled]>] for 
executor: LocalExecutor(parallelism=32)
   [2025-08-22T02:17:43.207+0000] {scheduler_job_runner.py:984} ERROR - 
Exception when executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 980, in _execute
       self._run_scheduler_loop()
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1266, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1408, in _do_scheduling
       num_queued_tis = 
self._critical_section_enqueue_task_instances(session=session)
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 716, in _critical_section_enqueue_task_instances
       self._enqueue_task_instances_with_queued_state(queued_tis_per_executor, 
executor, session=session)
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 665, in _enqueue_task_instances_with_queued_state
       workload = workloads.ExecuteTask.make(ti, 
generator=executor.jwt_generator)
     File "/opt/airflow/airflow-core/src/airflow/executors/workloads.py", line 
114, in make
       ser_ti = TaskInstance.model_validate(ti, from_attributes=True)
     File "/usr/local/lib/python3.10/site-packages/pydantic/main.py", line 705, 
in model_validate
       return cls.__pydantic_validator__.validate_python(
   pydantic_core._pydantic_core.ValidationError: 1 validation error for 
TaskInstance
   id
     Input should be a valid UUID, invalid group length in group 4: expected 
12, found 8 [type=uuid_parsing, input_value='0198cf6e-670d-3797-7578-d9f380de', 
input_type=str]
       For further information visit 
https://errors.pydantic.dev/2.11/v/uuid_parsing
   [2025-08-22T02:17:43.208+0000] {local_executor.py:230} INFO - Shutting down 
LocalExecutor; waiting for running tasks to finish.  Signal again if you don't 
want to wait.
   [2025-08-22T02:17:43.208+0000] {scheduler_job_runner.py:996} INFO - Exited 
execute loop
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 10, in <module>
       sys.exit(main())
     File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
       args.func(args)
     File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 49, 
in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 114, in 
wrapper
       return f(*args, **kwargs)
     File 
"/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 
52, in scheduler
       run_command_with_daemon_option(
     File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", 
line 86, in run_command_with_daemon_option
       callback()
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 
55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 
43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, 
in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 368, in 
run_job
       return execute_job(job, execute_callable=execute_callable)
     File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 397, in 
execute_job
       ret = execute_callable()
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 980, in _execute
       self._run_scheduler_loop()
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1266, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1408, in _do_scheduling
       num_queued_tis = 
self._critical_section_enqueue_task_instances(session=session)
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 716, in _critical_section_enqueue_task_instances
       self._enqueue_task_instances_with_queued_state(queued_tis_per_executor, 
executor, session=session)
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 665, in _enqueue_task_instances_with_queued_state
       workload = workloads.ExecuteTask.make(ti, 
generator=executor.jwt_generator)
     File "/opt/airflow/airflow-core/src/airflow/executors/workloads.py", line 
114, in make
       ser_ti = TaskInstance.model_validate(ti, from_attributes=True)
     File "/usr/local/lib/python3.10/site-packages/pydantic/main.py", line 705, 
in model_validate
       return cls.__pydantic_validator__.validate_python(
   pydantic_core._pydantic_core.ValidationError: 1 validation error for 
TaskInstance
   id
     Input should be a valid UUID, invalid group length in group 4: expected 
12, found 8 [type=uuid_parsing, input_value='0198cf6e-670d-3797-7578-d9f380de', 
input_type=str]
       For further information visit 
https://errors.pydantic.dev/2.11/v/uuid_parsing
   ```
   
   <img width="1184" height="191" alt="image" 
src="https://github.com/user-attachments/assets/0476f881-a68c-418b-b79e-94e0e15ee4b1";
 />
   
   
   Closes #54554
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to