Hi Pedro,

Hi Pedro,

Glad it helped
A couple of quick hints while you implement:

1) Configurable padding + N manifests

- Add two knobs (defaults shown):


   - stateStore.rocksdb.gc.paddingMs = 120000 (HDFS: 60–120s; S3/GCS:
   120–300s)
   - stateStore.rocksdb.gc.protectedVersions = 3 (union of last N manifests)

- Only delete candidates


   -  if:mtime(candidate) + paddingMs < min(mtime(referenced)) (or < now -
   paddingMs)

2) Final recheck before delete

   - Just before deletion, re-read the latest V..V-(N-1) manifests and drop
   any candidate that appears there. This will close the race

HTH

Dr Mich Talebzadeh,
Architect | Data Science | Financial Crime | Forensic Analysis | GDPR

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>





On Mon, 25 Aug 2025 at 19:29, Pedro Miguel Duarte <pmd...@gmail.com> wrote:

> Thanks for your reply!
>
> Yes this helps. I think adding a time padding will help prevent deleting
> files that are incorrectly labeled as orphaned in the current
> implementation. This only happens if two executors run maintenance at
> nearly the exact same time. I'll look into implementing a fix.
>
> On Mon, Aug 25, 2025 at 12:46 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> In your statement
>>
>> "*Instead of simply older, should there be some padding to allow for
>> maintenance being executed simultaneously on two executors?  Something like
>> at least 60s older than the oldest tracked file."*
>> *What you need to do is to add a time padding before deleting orphans
>> which is a good solution for concurrent maintenance*
>>
>> For your RocksDB state-store race, still apply the safety measures:
>>
>>    - Add a time padding window before deleting “orphans” (e.g., 60–120s;
>>    padding is cheap insurance).
>>    - Consider the union of last N manifests (N≥2–3) when deciding
>>    deletions. When deciding which .sst files are “orphans” (i.e.safe to
>>    delete), don’t look only at the latest snapshot/manifest (e.g., V.zip).
>>    Instead, build a protected set of files by taking the union of the files
>>    referenced by the last N manifests (e.g., versions V, V-1, …, V-(N-1)),
>>    with N ≥ 2 (and often 3 on object stores). Only delete files not in that
>>    union. This helps as it re reads the latest before delete (final sanity
>>    check).
>>
>> HTH
>>
>> Dr Mich Talebzadeh,
>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>>
>>
>> On Mon, 18 Aug 2025 at 17:42, Pedro Miguel Duarte <pmd...@gmail.com>
>> wrote:
>>
>>> Hello structured streaming experts!
>>>
>>> We are getting SST FileNotFound state store corruption errors.
>>>
>>> The root cause is a race condition where two different executors are
>>> doing cleanup of the state store at the same time.  Both write the version
>>> of the state zip file to DFS.
>>>
>>> The first executor enters maintenance, writes SST files and writes the
>>> 636.zip file.
>>>
>>> Concurrently the second executor enters maintenance, writes SST files
>>> and writes 636.zip.
>>>
>>> The SST files in dfs are written almost simultaneously and are:
>>>    - 000867-4695ff6e-d69d-4792-bcd6-191b57eadb9d.sst   <-- from one
>>> executor
>>>    - 000867-a71cb8b2-9ed8-4ec1-82e2-a406dd1fb949.sst   <-- from other
>>> executor
>>>
>>> The problem occurs during orphan file deletion (see this PR
>>> <https://github.com/apache/spark/pull/39897/files>). The executor that
>>> lost the race to write 636.zip decides that it will delete the SST file
>>> that is actually referenced in 636.zip.
>>>
>>> Currently the maintenance task does have some protection for files of
>>> ongoing tasks. This is from the comments in RocksDBFileManager: "only
>>> delete orphan files that are older than all tracked files when there are at
>>> least 2 versions".
>>>
>>> In this case the file that is being deleted is indeed older but only by
>>> a small amount of time.
>>>
>>> Instead of simply older, should there be some padding to allow for
>>> maintenance being executed simultaneously on two executors?  Something like
>>> at least 60s older than the oldest tracked file.
>>>
>>> This should help avoid state store corruption at the expense of some
>>> storage space in the DFS.
>>>
>>> Thanks for any help or recommendations here.
>>>
>>

Reply via email to