Hi all,

We hit a tiered-storage data-loss case on 3.9.0 and, after digging in,
believe it's a Kafka-core bug that still reproduces on trunk. Before
filing, I'd like to confirm it isn't already tracked and check whether it
should be a new JIRA or folded into an existing one. (My Jira account
request is pending, hence raising here first.)

What we observed: a routine rolling restart (config-only change) of a
3-broker cluster caused, on the 6 of 12 partitions whose leadership moved,
deletion of in-retention data from BOTH the remote tier and local disk on
all replicas. A consumer can no longer rewind below the new log-start
offset. On every affected partition the new logStartOffset ==
highestCopiedRemoteOffset + 1 (the whole copied window was expired). The
topic sat at its retention.bytes cap (6 GiB) with local.retention unset
(-2, so local mirrored the full window).

Diagnosis (core, not the RSM plugin):

UnifiedLog.highestOffsetInRemoteStorage is an in-memory field initialised
to -1, seeded only by RLMCopyTask.maybeUpdateCopiedOffset /
RLMFollowerTask. RLMExpirationTask never seeds it and has no guard for an
unseeded value.
On a freshly-elected leader the copy and expiration tasks start
concurrently (separate thread pools, both scheduled with initialDelay 0).
If expiration runs first, highestOffsetInRemoteStorage is still -1.
UnifiedLog.onlyLocalLogSegmentsSize() filters segments with baseOffset >=
highestOffsetInRemoteStorage(), i.e. >= -1, so it returns the ENTIRE local
log -- including segments already copied to remote but not yet locally
evicted.
RemoteLogManager.buildRetentionSizeData then computes totalSize =
onlyLocalLogSegmentsSize + remoteLogSizeBytes, double-counting that overlap
(~2x). At the cap this trips a false breach and expires the entire copied
remote tier; the resulting logStartOffset jump then cascades to
local-segment deletion on every replica.
Observed broker logs (redacted; offsets/bytes are the real values). The
partition's true unique size was ~6.5 GB (~= the 6 GiB cap), so the
"12,240,501,067" figure below can only be local+remote counted together --
the double-count, in the broker's own words:

config in effect (no local.retention.*):
14:49:54 broker-0 DynamicConfigPublisher Updating topic ...
remote.storage.enable -> true, retention.bytes -> 6442450944, retention.ms
-> 10368000000, segment.bytes -> 268435456

remote tier already fully copied for -2:
14:50:03 broker-0 RLMExpirationTask [partition=-2] Found the highest
copiedRemoteOffset: Optional[(offset=23741617, leaderEpoch=123)]

(1) the breach, with the inflated size (~12.2 GB vs ~6.5 GB real, cap 6
GiB):
14:50:10 broker-0 RLMExpirationTask [partition=-2] About to delete remote
log segment ... due to retention size 6442450944 breach. Log size after
deletion will be 12240501067. (repeated for all 9 copied segments, counting
down)

(2) remote objects physically deleted, then logStartOffset advanced:
14:50:11 broker-0 RemoteStorageManager Deleting log segment data ...
completed successfully 14:50:11 broker-0 UnifiedLog [partition=-2]
Incremented log start offset to 23741618 due to segment deletion

(3) followers replicate the watermark:
14:50:11 broker-2 UnifiedLog [partition=-2] Incremented log start offset to
23741618 due to leader offset increment

(4) local segments below the watermark deleted on every replica --
in-retention data, both tiers gone:
14:50:21 broker-0 UnifiedLog [partition=-2] Deleting segments due to local
log start offset 23741618 breach: LogSegment(baseOffset=22069718,
size=626073015, ...), LogSegment(baseOffset=22194681, size=1073699884,
...), LogSegment(baseOffset=22708935, size=722628195, ...), ...

The deleted local baseOffsets (22069718, ...) are exactly the offset ranges
that were also removed from remote -- the same data gone from both tiers.
No crash/unclean-shutdown/recovery was involved (clean controlled shutdown,
restartCount 0); the decoded __remote_log_metadata shows clean, contiguous,
non-overlapping segments (no orphans/duplicates).

Notably, RLMFollowerTask already seeds the offset with the comment "so that
the local log segments are not deleted before they are copied to remote
storage" -- the follower path was guarded against exactly this -1 hazard,
but the leader/expiration path that actually runs size-retention was not.

Scope / notes:

Size-retention only. retention.ms is unaffected (buildRetentionTimeData
uses only segment timestamps). retention.bytes = -1 makes a topic immune
(buildRetentionSizeData returns early when retentionSize < 0).
Affected 3.9.0 -> 4.3.0 -> trunk. The trunk rework of
buildRetentionSizeData (the fullCopyFinishedSegmentsSizeInBytes early-exit
gate) does not fix it: an inflated onlyLocalLogSegmentsSize clears the gate
and inflates the final sum.
Minimal repro (fails on trunk): two tests in RemoteLogManagerTest with
identical remote metadata and retention.bytes, differing only in
onlyLocalLogSegmentsSize() (the value the real method returns when
highestOffsetInRemoteStorage is 0 vs -1). The seeded case deletes nothing;
the unseeded case deletes both copied segments and advances logStartOffset
to 200. Diff and the captured failure (no checkout needed):

diff vs trunk:
https://github.com/apache/kafka/compare/trunk...horkyada:tiered-size-retention-unseeded-highest-offset-repro
branch:
https://github.com/horkyada/kafka/tree/tiered-size-retention-unseeded-highest-offset-repro

$ ./gradlew :storage:test --tests
"...RemoteLogManagerTest.testSizeRetention*" RemoteLogManagerTest >
testSizeRetentionAtCapDeletesNothingWhenHighestRemoteOffsetSeeded() PASSED
RemoteLogManagerTest >
testSizeRetentionDoesNotOverDeleteWhenHighestRemoteOffsetUnseeded() FAILED
org.mockito.exceptions.verification.NeverWantedButInvoked:
remoteStorageManager.deleteLogSegmentData(); But invoked here: ->
RemoteLogManager.deleteRemoteLogSegment(RemoteLogManager.java:1695)
[startOffset=0, endOffset=99, state=COPY_SEGMENT_FINISHED] ->
RemoteLogManager.deleteRemoteLogSegment(RemoteLogManager.java:1695)
[startOffset=100, endOffset=199, state=COPY_SEGMENT_FINISHED] 2 tests
completed, 1 failed

Relationship to existing issues (none looks like a match, but related):

KAFKA-17212 (open): same method, but the >=-vs-> off-by-one double-counting
a single segment while the offset is correct; its fix wouldn't address the
-1 whole-log case.
KAFKA-16711 (fixed): same -1 stale-offset class, different trigger (logDir
altering), different effect (blocks cleanup).
KAFKA-20148 (open): RLMExpirationTask data-loss, but triggered by disabling
remote storage, not the fresh-leader -1 size double-count.
Questions:

Is this already known/tracked anywhere I've missed?
Does it warrant a new JIRA (data-loss, Critical/Major), or should it attach
to one of the above?
For the fix, would you prefer the expiration path to seed
highestOffsetInRemoteStorage (mirroring RLMFollowerTask) before computing
size-retention, or to skip size-retention for a newly-led partition while
the offset is unset (defer one cycle)? Happy to open a PR with a regression
test once there's a direction.
I have the full broker-log timeline, the decoded __remote_log_metadata, and
the metric series, and can share whatever's useful.

Thanks,
Adam Horky

Reply via email to