Samrat002 commented on PR #6: URL: https://github.com/apache/flink-connector-redis-streams/pull/6#issuecomment-4413434586
@rmetzger thanks for pushing on this. You've hit the design choice I should have justified upfront in the PR. Let me walk through how I arrived at XREADGROUP. When this project started, I prototyped exactly what you're describing. XREAD-based, offsets in Flink state, no consumer groups. The SplitReader was simple and to the point, replicating the core design from the flink-connector-kafka source. Then we walked through production failure scenarios with our SRE team, and the picture changed. By the end, the design had shifted based on real operational concerns. Here are the scenarios that drove it. ## Scenario 1: the 3am page "If this pipeline gets stuck at 3am, what's the first thing you do?" Most Redis users will reach straight for `XINFO GROUPS` and `XPENDING`. With the XREAD approach, those answers are not available: 1. The Redis broker has no idea anyone is reading. 2. Questions like "is the consumer keeping up?" or "which message is poisoning?" become a Flink-internals investigation. With XREADGROUP, each is a one-line `redis-cli`. `XPENDING events flink-cg IDLE 60000 - + 10` returns stuck IDs, the owning consumer, idle time, and delivery count. Server-side poison-pill detection comes for free. Teams who pick Redis Streams over Kafka often do so because they want the in-memory performance of Redis. But Redis Streams are bounded by `MAXLEN`. A job recovering from a checkpoint may find that the stream has been truncated past its last offset by the time it tries to resume. The PEL holds the broker's view of the last delivered ID and gives recovery a fighting chance in that window. ## Scenario 2: a TaskManager crashes during a heavy batch We checkpoint every 30 seconds to 5 minutes in production, depending on the use case. The source reads at around 50K msg/sec. XREAD prototype, worst case: a TM crash mid-batch re-delivers around 30s × 50K = **1.5M duplicates**. Our sink dedupes against a stateful KV store, so this spikes dedup state, store latency, and downstream API rate limits on every TM hiccup. XREADGROUP: re-delivery is bounded by `maxDeferredAckQueueSize` (default 10K). Same failure, roughly 150x less re-delivery. The PEL is doing what Salvatore designed it to do: bounding the consequences of consumer failure to *what was actually in flight*, not *what could have been read since the last commit*. ## Scenario 3: migration and upgrades Someone runs `flink cancel` instead of `flink stop --savepoint`. State is gone. XREAD prototype: nothing to read position from. EARLIEST replays everything in retention. LATEST silently drops the gap. Both are bad. XREADGROUP: the consumer group still exists on the broker. `last-delivered-id` and PEL are intact. A new job under the same group name drains PEL first, then resumes from `last-delivered-id`. Zero data loss, zero re-replay. This isn't theoretical. Savepoint corruption, RocksDB checksum failures, accidental cancels all happen on real on-call rotations. The PEL is a second source of truth that survives them. ## Scenario 4: where XREADGROUP loses, which is autoscaling I want to be honest about this one. The effective consumer name is `<consumerName>-<subtaskIndex>`. Going from 4 to 8 subtasks creates new identities, and in-flight PEL entries owned by the old subtasks can end up orphaned. XREAD has no equivalent issue. The fix on the XREADGROUP side is `XAUTOCLAIM` on subtask startup, scoped by idle-time threshold. Straightforward, but in the follow-up. I'm not going to pretend it's already wired up. It's the legitimate operational footgun and you're right to flag it. What I weighed: this is a known, bounded, fixable problem. The XREAD-side wins are intrinsic to the model and can't be added back later. So I shipped XREADGROUP and accepted the XAUTOCLAIM follow-up. ## Scenario 5: MAXLEN trimming, where both lose equally Both models have the same data-loss failure mode under heavy trimming. XREADGROUP gives you forensic visibility (`XPENDING` shows IDs of entries delivered before trim) but not recovery. The PEL is a delivery-tracking ledger, not a magic durability shield. ## On the complexity tax The "770-line SplitReader" decomposes more favourably than it sounds: - **Deferred ACK and per-checkpoint snapshots (~150 lines)**: required for at-least-once even with XREAD. - **Circuit breaker, reconnect, backoff (~150 lines)**: required regardless. - **PEL recovery (~100 lines)**: replaces what XREAD would need anyway, namely offset persistence and restart resume. - **NOGROUP/BUSYGROUP handling, group init, consumer naming (~150 lines)**: genuinely XREADGROUP-only. Real cost of XREADGROUP is around 150 lines, not 500. ## On an XREAD opt-in Reasonable as a v1.1 follow-up: `setConsumptionMode(NO_GROUP | CONSUMER_GROUP)`, defaulting to `CONSUMER_GROUP`, with XREAD as a separate `RedisStreamsXReadSplitReader`. The lifecycles are different enough that sharing code would muddy both. I'd push back on flipping the default. The scenarios above are the ones that drive teams to Redis Streams over Kafka in the first place. An XREAD-default would optimise for the case where Redis is being used as in-memory Kafka, which isn't the majority case for teams who consciously chose Redis Streams. cc: @Poorvankbhatia (for viz) Please chime in if something is missed from the design thought process. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
