ari created FLINK-38698:
---------------------------

             Summary: TaskInformation blobs accumulate without cleanup causing 
storage exhaustion
                 Key: FLINK-38698
                 URL: https://issues.apache.org/jira/browse/FLINK-38698
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.20.3, 1.19.3, 1.18.1, 1.17.2, 1.16.3, 1.15.4, 1.14.3
            Reporter: ari


h1. Summary

When adaptive scheduler is enabled, TaskInformation blob files accumulate on 
persistent storage without proper cleanup, eventually leading to storage 
exhaustion and job stalling.


h1. Problem Details

On each job deployment and restart (including JM failures), 
{{ExecutionJobVertex.getTaskInformationOrBlobKey() }}reconstructs 
TaskInformation objects from the JobGraph. These TaskInformation objects can be 
extremely large (>200MB) as they can contain serialized UDFs within the task 
configuration.

Specifically, the Configuration field in TaskInformation includes a 
{{SERIALIZED_UDF}} entry.

When these objects exceed the {{blob.offload-minsize}} configuration, they are 
offloaded as permanent blobs to avoid hitting RPC framesize limits. However:
 # Blob keys are not reused across failures - each restart creates a new blob 
with a different key (same content hash)
 # No cleanup mechanism until global termination - Permanent blobs are only 
cleaned up when the job reaches a globally terminated state (a state that 
doesn’t get reached during internal restarts)
 # JobJar blobs ARE reused - In contrast, job JAR blobs stored in the JobGraph 
have their keys persisted and are correctly reused

 
h1. Impact
 * Storage Exhaustion in storage directory (specifically a problem for 
high-availability storage directory since there could be hard storage limits)
 * Job Stalling when storage limit is reached as a restart occurs but because 
it cant offload the blob it sends it over RPC causing it to hit the framesize 
limit causing checkpoints to never trigger.
 * Particularly severe with
 * Complex streaming jobs with large/many serialized UDFs in task config
 * Frequent TM failures requiring restarts
 * High parallelism (each parallelized vertex creates its on TaskInformation 
blob)

h1. Reproduction Steps
 # Enable adaptive scheduler
 # Se {{blob.offload-minsize: 0}} (forces all TaskInformation objects to be 
offloaded)
 # Run {{kubectl delete pod \{task-manager-pod-name}}} to trigger job restart
 # Wait for job to restart and process records
 # {{kubectl exec -it \{job-manager-pod-name} -- bash}}
 # cd to blobstore directory and run {{ls && stat *}}
 # Observe: Every file except the job JAR blob is duplicated after each restart

h1. Expected vs Actual Behavior

Expected: On a restart if content hash is the same, use the previously created 
task information object. However, from Flink-7140 it seems that a random key 
was introduced to the blob key to prevent hash-collisions. Otherwise, delete 
the ones that are no longer needed and then generate the new one.

Actual: New TaskInformation blobs are created on every restart, there is no 
cleanup until job reaches globally terminated state, and there is unbounded 
blob accumulation over time



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to