[
https://issues.apache.org/jira/browse/FLINK-38698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ari updated FLINK-38698:
------------------------
Priority: Critical (was: Major)
> TaskInformation blobs accumulate without cleanup causing storage exhaustion
> ---------------------------------------------------------------------------
>
> Key: FLINK-38698
> URL: https://issues.apache.org/jira/browse/FLINK-38698
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.14.3, 1.15.4, 1.16.3, 1.17.2, 1.18.1, 1.19.3, 1.20.3
> Reporter: ari
> Priority: Critical
>
> h1. Summary
> When adaptive scheduler is enabled, TaskInformation blob files accumulate on
> persistent storage without proper cleanup, eventually leading to storage
> exhaustion and job stalling.
> h1. Problem Details
> On each job deployment and restart (including JM failures),
> {{ExecutionJobVertex.getTaskInformationOrBlobKey() }}reconstructs
> TaskInformation objects from the JobGraph. These TaskInformation objects can
> be extremely large (>200MB) as they can contain serialized UDFs within the
> task configuration.
> Specifically, the Configuration field in TaskInformation includes a
> {{SERIALIZED_UDF}} entry.
> When these objects exceed the {{blob.offload-minsize}} configuration, they
> are offloaded as permanent blobs to avoid hitting RPC framesize limits.
> However:
> # Blob keys are not reused across failures - each restart creates a new blob
> with a different key (same content hash)
> # No cleanup mechanism until global termination - Permanent blobs are only
> cleaned up when the job reaches a globally terminated state (a state that
> doesn’t get reached during internal restarts)
> # JobJar blobs ARE reused - In contrast, job JAR blobs stored in the
> JobGraph have their keys persisted and are correctly reused
>
> h1. Impact
> * Storage Exhaustion in storage directory (specifically a problem for
> high-availability storage directory since there could be hard storage limits)
> * Job Stalling when storage limit is reached as a restart occurs but because
> it cant offload the blob it sends it over RPC causing it to hit the framesize
> limit causing checkpoints to never trigger.
> * Particularly severe with
> * Complex streaming jobs with large/many serialized UDFs in task config
> * Frequent TM failures requiring restarts
> * High parallelism (each parallelized vertex creates its on TaskInformation
> blob)
> h1. Reproduction Steps
> # Enable adaptive scheduler
> # Se {{blob.offload-minsize: 0}} (forces all TaskInformation objects to be
> offloaded)
> # Run {{kubectl delete pod \{task-manager-pod-name}}} to trigger job restart
> # Wait for job to restart and process records
> # {{kubectl exec -it \{job-manager-pod-name} -- bash}}
> # cd to blobstore directory and run {{ls && stat *}}
> # Observe: Every file except the job JAR blob is duplicated after each
> restart
> h1. Expected vs Actual Behavior
> Expected: On a restart if content hash is the same, use the previously
> created task information object. However, from Flink-7140 it seems that a
> random key was introduced to the blob key to prevent hash-collisions.
> Otherwise, delete the ones that are no longer needed and then generate the
> new one.
> Actual: New TaskInformation blobs are created on every restart, there is no
> cleanup until job reaches globally terminated state, and there is unbounded
> blob accumulation over time
--
This message was sent by Atlassian Jira
(v8.20.10#820010)