[
https://issues.apache.org/jira/browse/FLINK-38698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18085398#comment-18085398
]
Spoorthi Basu commented on FLINK-38698:
---------------------------------------
Hi, I have investigated this issue and was able to reproduce it.
The root cause is specific to the adaptive scheduler. On every restart or
rescale it rebuilds the {{{}ExecutionGraph{}}}, and each rebuild re-offloads
the deployment metadata to the BLOB store under fresh keys (random since
FLINK-7140). The BLOBs from the superseded graph are only removed at global job
termination, so on a long-running job that restarts or rescales repeatedly they
accumulate in the (HA) BLOB store and are never reclaimed for the lifetime of
the job, eventually exhausting it and blocking recovery. The default scheduler
is not affected, since it reuses the same {{ExecutionGraph}} rather than
rebuilding it.
To address this, I propose releasing a graph's offloaded deployment BLOBs at
the point the adaptive scheduler discards that graph, rather than deferring to
global termination. The cleanup reuses the graph's existing BLOB-deletion path
and covers the cases where the adaptive scheduler discards a graph, with no
impact on state, wire format, or public API. I implemented this locally and
validated it with tests covering these cases.
If this approach looks good, I would appreciate being assigned to this ticket
so I can open a PR.
> TaskInformation blobs accumulate without cleanup causing storage exhaustion
> ---------------------------------------------------------------------------
>
> Key: FLINK-38698
> URL: https://issues.apache.org/jira/browse/FLINK-38698
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.14.3, 1.15.4, 1.16.3, 1.17.2, 1.18.1, 1.19.3, 1.20.3
> Reporter: ari
> Priority: Critical
>
> h1. Summary
> When adaptive scheduler is enabled, TaskInformation blob files accumulate on
> persistent storage without proper cleanup, eventually leading to storage
> exhaustion and job stalling.
> h1. Problem Details
> On each job deployment and restart (including JM failures),
> {{ExecutionJobVertex.getTaskInformationOrBlobKey() }}reconstructs
> TaskInformation objects from the JobGraph. These TaskInformation objects can
> be extremely large (>200MB) as they can contain serialized UDFs within the
> task configuration.
> Specifically, the Configuration field in TaskInformation includes a
> {{SERIALIZED_UDF}} entry.
> When these objects exceed the {{blob.offload-minsize}} configuration, they
> are offloaded as permanent blobs to avoid hitting RPC framesize limits.
> However:
> # Blob keys are not reused across failures - each restart creates a new blob
> with a different key (same content hash)
> # No cleanup mechanism until global termination - Permanent blobs are only
> cleaned up when the job reaches a globally terminated state (a state that
> doesn’t get reached during internal restarts)
> # JobJar blobs ARE reused - In contrast, job JAR blobs stored in the
> JobGraph have their keys persisted and are correctly reused
>
> h1. Impact
> * Storage Exhaustion in storage directory (specifically a problem for
> high-availability storage directory since there could be hard storage limits)
> * Job Stalling when storage limit is reached as a restart occurs but because
> it cant offload the blob it sends it over RPC causing it to hit the framesize
> limit causing checkpoints to never trigger.
> * Particularly severe with
> * Complex streaming jobs with large/many serialized UDFs in task config
> * Frequent TM failures requiring restarts
> * High parallelism (each parallelized vertex creates its on TaskInformation
> blob)
> h1. Reproduction Steps
> # Enable adaptive scheduler
> # Se {{blob.offload-minsize: 0}} (forces all TaskInformation objects to be
> offloaded)
> # Run {{kubectl delete pod \{task-manager-pod-name}}} to trigger job restart
> # Wait for job to restart and process records
> # {{kubectl exec -it \{job-manager-pod-name} -- bash}}
> # cd to blobstore directory and run {{ls && stat *}}
> # Observe: Every file except the job JAR blob is duplicated after each
> restart
> h1. Expected vs Actual Behavior
> Expected: On a restart if content hash is the same, use the previously
> created task information object. However, from Flink-7140 it seems that a
> random key was introduced to the blob key to prevent hash-collisions.
> Otherwise, delete the ones that are no longer needed and then generate the
> new one.
> Actual: New TaskInformation blobs are created on every restart, there is no
> cleanup until job reaches globally terminated state, and there is unbounded
> blob accumulation over time
--
This message was sent by Atlassian Jira
(v8.20.10#820010)