lnbest0707 opened a new pull request, #223:
URL: https://github.com/apache/flink-connector-kafka/pull/223
Background
----------
DynamicKafkaSource currently composes one KafkaSourceEnumerator per Kafka
cluster.
In legacy mode, each sub-enumerator assigns partition splits independently
using the
default KafkaSource owner function (topic-hash + partition). That gives
per-cluster
balance, but can produce global skew when multiple clusters are active
because each
cluster balances against the full reader set in isolation.
This change introduces an optional `GLOBAL` assignment mode while keeping
`LEGACY` as the
default for backward compatibility.
Checkpoint model (what is stored where)
---------------------------------------
JobManager / Enumerator checkpoint state:
- DynamicKafkaSourceEnumState:
- latest Kafka stream metadata (kafkaStreams), i.e. discovered
streams/clusters/topics
and cluster metadata used to rebuild sub-enumerators.
- per-cluster KafkaSourceEnumState (clusterEnumeratorStates).
- KafkaSourceEnumState stores:
- split inventory with assignment status (ASSIGNED / UNASSIGNED) as
SplitAndAssignmentStatus(KafkaPartitionSplit, status).
- initialDiscoveryFinished flag.
- (even before this change) **enumerator checkpoint does NOT persist
split->reader owner mapping**.
TaskManager / Reader checkpoint state:
- DynamicKafkaSourceReader snapshots List<DynamicKafkaSourceSplit> from
all sub-readers.
- Each KafkaPartitionSplit snapshot carries the current consumed offset as
split start
(via KafkaPartitionSplitState -> KafkaPartitionSplit), so record
progress is restored
exactly.
- Pending splits (before metadata activation) are also preserved.
- **Reader knows which splits to consume from the checkpoint**
Global mode design
------------------
New source option:
- stream-enumerator-mode = `per-cluster` | `global`
- default remains per-cluster.
Global mode behavior:
- A single `GlobalSplitOwnerAssigner` is shared by all sub-enumerators.
- For each cluster sub-enumerator, DynamicKafkaSourceEnumerator injects a
`SplitOwnerSelector` callback into KafkaSourceEnumerator.
- KafkaSourceEnumerator still owns discovery + pending assignment
mechanics, but owner
choice is delegated to the global selector.
Global owner selection:
- Split IDs are normalized as "<clusterId>-<partitionSplitId>" to make
ownership global.
- On new split assignment, owner is chosen by global round-robin cursor
derived from
currently known active split count (size % parallelism), then split
becomes known.
- On `addSplitsBack`, returned splits record a preferred owner hint; if
still valid under
current parallelism it is honored once, otherwise fallback to
round-robin.
Interaction with sub-enumerators
--------------------------------
No sub-enumerator lifecycle changes are made:
- Sub-enumerators still discover partitions asynchronously and build
pendingPartitionSplitAssignment.
- The only behavioral change is owner computation (global selector vs
topic name hash).
- Existing assignPendingPartitionSplits/addReader/addSplitsBack paths are
reused.
- This preserves KafkaSourceEnumerator invariants while enabling
cross-cluster balance.
Recovery and rescaling semantics
--------------------------------
1) Normal checkpoint recovery (same parallelism)
- Readers restore split state (with offsets) from operator state.
- Enumerators restore split inventory/status from
DynamicKafkaSourceEnumState.
- Because owner IDs are not checkpointed, enumerators do not conflict with
restored
reader ownership; they only assign truly unassigned/new/splits-back work.
2) Recovery after metadata changes
- Enumerator snapshots current sub-enumerator state, rebuilds only active
clusters/topics,
filters stale topics, and restores from filtered state.
- Global strategy reseeds active split IDs from restored state, so new
assignments remain
globally balanced without duplicating already-restored splits.
- As **it only assign new/pending split, it would not require a full
rebalance on all splits**.
3) Rescale / autoscaling (parallelism change)
- Flink repartitions reader operator state to new subtasks before readers
start; split
ownership may move across reader indices as part of standard
operator-state restore.
- Enumerator state remains valid because it tracks split status
(ASSGINED/UNASSIGNED), not reader IDs.
- New assignments after restore use current parallelism via the global
selector.
- Splits-back preferred owner is applied only if the subtask id still
exists; otherwise
fallback is deterministic round-robin under new parallelism.
Determinism: what changed and why it matters
--------------------------------------------
Legacy owner mapping is a pure function of topic/partition/parallelism, so
a split’s
owner is highly predictable.
Global mode is intentionally not fully deterministic across runs, it is
**forward looking global balancing**:
- Exact owner can depend on discovery/event ordering across clusters and
metadata updates.
- Async discovery timing and rescale/failover timing can alter assignment
order.
- **Splits increase would not trigger assignment re-distribution and it
remains global balancing.**
- **Splits decrease would not trigger assignment re-distribution and the
global balancing is no longer guaranteed. The "holes" of the removed splits
remains until the next repartition, e.g. by parallelism change from
auto-scaling.**
What remains guaranteed:
- No duplicate assignment of active splits.
- Correct recovery from checkpoints.
- Global balancing objective for newly assigned work (observed skew
bounded in tests).
- Optional split-back affinity when valid.
Why this works:
- Data correctness is anchored by reader split state (offset progress) +
enumerator split
inventory/status.
- Though, we no longer have the fully deterministic assignment, stable
owner identity is not required for correctness; it is a scheduling choice.
Trade-off:
- We trade strict owner predictability for better cross-cluster balancing
and lower global
skew in multi-cluster dynamic deployments.
- We don't guarantee deterministic global assignment order to avoid full
rebalancing after stream metadata change.
Tests
-----
- Added/updated unit/integration coverage for:
- global balancing across clusters,
- cluster/topic expansion handling,
- cluster/topic removal handling
- recovery + repartition behavior under global mode,
- global owner assigner behavior (round-robin, split-back affinity,
out-of-range fallback).
A mini-flink run could show the semantics of the global enumerator behavior
```
Results
Before add (/tmp/streamflink-dynamic-global-p7-rescale2.before_add.summary)
reader 0: local-0/example-topic/3, local-1/example-topic/0
reader 1: local-1/example-topic/1, local-2/example-topic/0
reader 2: local-1/example-topic/2, local-2/example-topic/1
reader 3: local-1/example-topic/3, local-2/example-topic/2
reader 4: local-0/example-topic/0, local-2/example-topic/3
reader 5: local-0/example-topic/1
reader 6: local-0/example-topic/2
After add local-3 (/tmp/streamflink-dynamic-global-p7-
rescale2.after_add.summary)
reader 0: local-0/example-topic/3, local-1/example-topic/0,
local-3/example-
topic/2
reader 1: local-1/example-topic/1, local-2/example-topic/0,
local-3/example-
topic/3
reader 2: local-1/example-topic/2, local-2/example-topic/1
reader 3: local-1/example-topic/3, local-2/example-topic/2
reader 4: local-0/example-topic/0, local-2/example-topic/3
reader 5: local-0/example-topic/1, local-3/example-topic/0
reader 6: local-0/example-topic/2, local-3/example-topic/1
After remove local-2 (/tmp/streamflink-dynamic-global-p7-
rescale2.after_remove.summary)
reader 0: local-0/example-topic/3, local-1/example-topic/0,
local-3/example-
topic/2
reader 1: local-1/example-topic/1, local-3/example-topic/3
reader 2: local-1/example-topic/2
reader 3: local-1/example-topic/3
reader 4: local-0/example-topic/0
reader 5: local-0/example-topic/1, local-3/example-topic/0
reader 6: local-0/example-topic/2, local-3/example-topic/1
After readd local-2 (/tmp/streamflink-dynamic-global-p7-
rescale2.after_readd.summary)
reader 0: local-0/example-topic/3, local-1/example-topic/0,
local-2/example-
topic/2, local-3/example-topic/2
reader 1: local-1/example-topic/1, local-2/example-topic/3,
local-3/example-
topic/3
reader 2: local-1/example-topic/2
reader 3: local-1/example-topic/3
reader 4: local-0/example-topic/0
reader 5: local-0/example-topic/1, local-2/example-topic/0,
local-3/example-
topic/0
reader 6: local-0/example-topic/2, local-2/example-topic/1,
local-3/example-
topic/1
After rescale to p=5 (/tmp/streamflink-dynamic-global-p7-rescale2.summary)
reader 0: local-0/example-topic/3, local-1/example-topic/0,
local-2/example-
topic/2, local-3/example-topic/2
reader 1: local-1/example-topic/1, local-2/example-topic/3,
local-3/example-
topic/3
reader 2: local-0/example-topic/0, local-1/example-topic/2,
local-1/example-
topic/3
reader 3: local-0/example-topic/1, local-2/example-topic/0,
local-3/example-
topic/0
reader 4: local-0/example-topic/2, local-2/example-topic/1,
local-3/example-
topic/1
```
On the other side, if it is per-cluster mode, the assignment would always be
like
```
reader 2: local-0/example-topic/0, local-1/example-topic/0,
local-2/example-topic/0
reader 3: local-0/example-topic/1, local-1/example-topic/1,
local-2/example-topic/1
reader 4: local-0/example-topic/2, local-1/example-topic/2,
local-2/example-topic/2
reader 5: local-0/example-topic/3, local-1/example-topic/3,
local-2/example-topic/3
```
and other readers idle.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]