Samrat002 commented on PR #6:
URL: 
https://github.com/apache/flink-connector-redis-streams/pull/6#issuecomment-4413434586

   @rmetzger thanks for pushing on this. You've hit the design choice I should 
have justified upfront in the PR. 
   Let me walk through how I arrived at XREADGROUP.
   
   When this project started, I prototyped exactly what you're describing. 
XREAD-based, offsets in Flink state, no consumer groups.
   The SplitReader was simple and to the point, replicating the core design 
from the flink-connector-kafka source.
   
   Then we walked through production failure scenarios with our SRE team, and 
the picture changed. 
   By the end, the design had shifted based on real operational concerns. Here 
are the scenarios that drove it.
   
   ## Scenario 1: the 3am page
   
   "If this pipeline gets stuck at 3am, what's the first thing you do?" Most 
Redis users will reach straight for `XINFO GROUPS` and `XPENDING`.
   With the XREAD approach, those answers are not available:
   
   1. The Redis broker has no idea anyone is reading.
   2. Questions like "is the consumer keeping up?" or "which message is 
poisoning?" become a Flink-internals investigation.
   
   With XREADGROUP, each is a one-line `redis-cli`. `XPENDING events flink-cg 
IDLE 60000 - + 10` returns stuck IDs, the owning consumer, idle time, and 
delivery count. Server-side poison-pill detection comes for free.
   
   Teams who pick Redis Streams over Kafka often do so because they want the 
in-memory performance of Redis. But Redis Streams are bounded by `MAXLEN`. 
   A job recovering from a checkpoint may find that the stream has been 
truncated past its last offset by the time it tries to resume. 
   The PEL holds the broker's view of the last delivered ID and gives recovery 
a fighting chance in that window.
   
   ## Scenario 2: a TaskManager crashes during a heavy batch
   
   We checkpoint every 30 seconds to 5 minutes in production, depending on the 
use case. The source reads at around 50K msg/sec.
   
   XREAD prototype, worst case: a TM crash mid-batch re-delivers around 30s × 
50K = **1.5M duplicates**. Our sink dedupes against a stateful KV store, so 
this spikes dedup state, store latency, and downstream API rate limits on every 
TM hiccup.
   
   XREADGROUP: re-delivery is bounded by `maxDeferredAckQueueSize` (default 
10K). 
   Same failure, roughly 150x less re-delivery. The PEL is doing what Salvatore 
designed it to do: bounding the consequences of consumer failure to *what was 
actually in flight*, not *what could have been read since the last commit*.
   
   ## Scenario 3: migration and upgrades
   
   Someone runs `flink cancel` instead of `flink stop --savepoint`. State is 
gone.
   
   XREAD prototype: nothing to read position from. EARLIEST replays everything 
in retention. LATEST silently drops the gap. Both are bad.
   
   XREADGROUP: the consumer group still exists on the broker. 
`last-delivered-id` and PEL are intact.
   A new job under the same group name drains PEL first, then resumes from 
`last-delivered-id`. Zero data loss, zero re-replay.
   
   This isn't theoretical. Savepoint corruption, RocksDB checksum failures, 
accidental cancels all happen on real on-call rotations. 
   The PEL is a second source of truth that survives them.
   
   ## Scenario 4: where XREADGROUP loses, which is autoscaling
   
   I want to be honest about this one. The effective consumer name is 
`<consumerName>-<subtaskIndex>`. 
   Going from 4 to 8 subtasks creates new identities, and in-flight PEL entries 
owned by the old subtasks can end up orphaned.
   XREAD has no equivalent issue.
   
   The fix on the XREADGROUP side is `XAUTOCLAIM` on subtask startup, scoped by 
idle-time threshold.
   Straightforward, but in the follow-up. I'm not going to pretend it's already 
wired up. 
   It's the legitimate operational footgun and you're right to flag it.
   
   What I weighed: this is a known, bounded, fixable problem. 
   The XREAD-side wins are intrinsic to the model and can't be added back 
later. 
   So I shipped XREADGROUP and accepted the XAUTOCLAIM follow-up.
   
   ## Scenario 5: MAXLEN trimming, where both lose equally
   
   Both models have the same data-loss failure mode under heavy trimming. 
   XREADGROUP gives you forensic visibility (`XPENDING` shows IDs of entries 
delivered before trim) but not recovery. 
   The PEL is a delivery-tracking ledger, not a magic durability shield.
   
   ## On the complexity tax
   
   The "770-line SplitReader" decomposes more favourably than it sounds:
   
   - **Deferred ACK and per-checkpoint snapshots (~150 lines)**: required for 
at-least-once even with XREAD.
   - **Circuit breaker, reconnect, backoff (~150 lines)**: required regardless.
   - **PEL recovery (~100 lines)**: replaces what XREAD would need anyway, 
namely offset persistence and restart resume.
   - **NOGROUP/BUSYGROUP handling, group init, consumer naming (~150 lines)**: 
genuinely XREADGROUP-only.
   
   Real cost of XREADGROUP is around 150 lines, not 500.
   
   ## On an XREAD opt-in
   
   Reasonable as a v1.1 follow-up: `setConsumptionMode(NO_GROUP | 
CONSUMER_GROUP)`, defaulting to `CONSUMER_GROUP`, with XREAD as a separate 
`RedisStreamsXReadSplitReader`. The lifecycles are different enough that 
sharing code would muddy both.
   
   I'd push back on flipping the default. The scenarios above are the ones that 
drive teams to Redis Streams over Kafka in the first place. 
   An XREAD-default would optimise for the case where Redis is being used as 
in-memory Kafka, which isn't the majority case for teams who consciously chose 
Redis Streams.
   
   
   cc: @Poorvankbhatia (for viz) Please chime in if something is missed from 
the design thought process. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to