lnbest0707 opened a new pull request, #223:
URL: https://github.com/apache/flink-connector-kafka/pull/223

     Background
     ----------
     DynamicKafkaSource currently composes one KafkaSourceEnumerator per Kafka 
cluster.
     In legacy mode, each sub-enumerator assigns partition splits independently 
using the
     default KafkaSource owner function (topic-hash + partition). That gives 
per-cluster
     balance, but can produce global skew when multiple clusters are active 
because each
     cluster balances against the full reader set in isolation.
   
     This change introduces an optional `GLOBAL` assignment mode while keeping 
`LEGACY` as the
     default for backward compatibility.
   
     Checkpoint model (what is stored where)
     ---------------------------------------
     JobManager / Enumerator checkpoint state:
     - DynamicKafkaSourceEnumState:
       - latest Kafka stream metadata (kafkaStreams), i.e. discovered 
streams/clusters/topics
         and cluster metadata used to rebuild sub-enumerators.
       - per-cluster KafkaSourceEnumState (clusterEnumeratorStates).
     - KafkaSourceEnumState stores:
       - split inventory with assignment status (ASSIGNED / UNASSIGNED) as
         SplitAndAssignmentStatus(KafkaPartitionSplit, status).
       - initialDiscoveryFinished flag.
     - (even before this change) **enumerator checkpoint does NOT persist 
split->reader owner mapping**.
   
     TaskManager / Reader checkpoint state:
     - DynamicKafkaSourceReader snapshots List<DynamicKafkaSourceSplit> from 
all sub-readers.
     - Each KafkaPartitionSplit snapshot carries the current consumed offset as 
split start
       (via KafkaPartitionSplitState -> KafkaPartitionSplit), so record 
progress is restored
       exactly.
     - Pending splits (before metadata activation) are also preserved.
     - **Reader knows which splits to consume from the checkpoint**
   
     Global mode design
     ------------------
     New source option:
     - stream-enumerator-mode = `per-cluster` | `global`
     - default remains per-cluster.
   
     Global mode behavior:
     - A single `GlobalSplitOwnerAssigner` is shared by all sub-enumerators.
     - For each cluster sub-enumerator, DynamicKafkaSourceEnumerator injects a
       `SplitOwnerSelector` callback into KafkaSourceEnumerator.
     - KafkaSourceEnumerator still owns discovery + pending assignment 
mechanics, but owner
       choice is delegated to the global selector.
   
     Global owner selection:
     - Split IDs are normalized as "<clusterId>-<partitionSplitId>" to make 
ownership global.
     - On new split assignment, owner is chosen by global round-robin cursor 
derived from
       currently known active split count (size % parallelism), then split 
becomes known.
     - On `addSplitsBack`, returned splits record a preferred owner hint; if 
still valid under
       current parallelism it is honored once, otherwise fallback to 
round-robin.
   
     Interaction with sub-enumerators
     --------------------------------
     No sub-enumerator lifecycle changes are made:
     - Sub-enumerators still discover partitions asynchronously and build
       pendingPartitionSplitAssignment.
     - The only behavioral change is owner computation (global selector vs 
topic name hash).
     - Existing assignPendingPartitionSplits/addReader/addSplitsBack paths are 
reused.
     - This preserves KafkaSourceEnumerator invariants while enabling 
cross-cluster balance.
   
     Recovery and rescaling semantics
     --------------------------------
     1) Normal checkpoint recovery (same parallelism)
     - Readers restore split state (with offsets) from operator state.
     - Enumerators restore split inventory/status from 
DynamicKafkaSourceEnumState.
     - Because owner IDs are not checkpointed, enumerators do not conflict with 
restored
       reader ownership; they only assign truly unassigned/new/splits-back work.
   
     2) Recovery after metadata changes
     - Enumerator snapshots current sub-enumerator state, rebuilds only active 
clusters/topics,
       filters stale topics, and restores from filtered state.
     - Global strategy reseeds active split IDs from restored state, so new 
assignments remain
       globally balanced without duplicating already-restored splits.
     - As **it only assign new/pending split, it would not require a full 
rebalance on all splits**.
   
     3) Rescale / autoscaling (parallelism change)
     - Flink repartitions reader operator state to new subtasks before readers 
start; split
       ownership may move across reader indices as part of standard 
operator-state restore.
     - Enumerator state remains valid because it tracks split status 
(ASSGINED/UNASSIGNED), not reader IDs.
     - New assignments after restore use current parallelism via the global 
selector.
     - Splits-back preferred owner is applied only if the subtask id still 
exists; otherwise
       fallback is deterministic round-robin under new parallelism.
   
     Determinism: what changed and why it matters
     --------------------------------------------
     Legacy owner mapping is a pure function of topic/partition/parallelism, so 
a split’s
     owner is highly predictable.
   
     Global mode is intentionally not fully deterministic across runs, it is 
**forward looking global balancing**:
     - Exact owner can depend on discovery/event ordering across clusters and 
metadata updates.
     - Async discovery timing and rescale/failover timing can alter assignment 
order.
     - **Splits increase would not trigger assignment re-distribution and it 
remains global balancing.**
     - **Splits decrease would not trigger assignment re-distribution and the 
global balancing is no longer guaranteed. The "holes" of the removed splits 
remains until the next repartition, e.g. by parallelism change from 
auto-scaling.**
   
     What remains guaranteed:
     - No duplicate assignment of active splits.
     - Correct recovery from checkpoints.
     - Global balancing objective for newly assigned work (observed skew 
bounded in tests).
     - Optional split-back affinity when valid.
   
     Why this works:
     - Data correctness is anchored by reader split state (offset progress) + 
enumerator split
       inventory/status.
     - Though, we no longer have the fully deterministic assignment, stable 
owner identity is not required for correctness; it is a scheduling choice.
    
     Trade-off:
     - We trade strict owner predictability for better cross-cluster balancing 
and lower global
       skew in multi-cluster dynamic deployments.
     - We don't guarantee deterministic global assignment order to avoid full 
rebalancing after stream metadata change.
   
   
     Tests
     -----
     - Added/updated unit/integration coverage for:
       - global balancing across clusters,
       - cluster/topic expansion handling,
       - cluster/topic removal handling
       - recovery + repartition behavior under global mode,
       - global owner assigner behavior (round-robin, split-back affinity, 
out-of-range fallback).
   
     A mini-flink run could show the semantics of the global enumerator behavior
   ```
   Results
   
     Before add (/tmp/streamflink-dynamic-global-p7-rescale2.before_add.summary)
   
     reader 0: local-0/example-topic/3, local-1/example-topic/0
     reader 1: local-1/example-topic/1, local-2/example-topic/0
     reader 2: local-1/example-topic/2, local-2/example-topic/1
     reader 3: local-1/example-topic/3, local-2/example-topic/2
     reader 4: local-0/example-topic/0, local-2/example-topic/3
     reader 5: local-0/example-topic/1
     reader 6: local-0/example-topic/2
   
     After add local-3 (/tmp/streamflink-dynamic-global-p7-
     rescale2.after_add.summary)
   
     reader 0: local-0/example-topic/3, local-1/example-topic/0, 
local-3/example-
     topic/2
     reader 1: local-1/example-topic/1, local-2/example-topic/0, 
local-3/example-
     topic/3
     reader 2: local-1/example-topic/2, local-2/example-topic/1
     reader 3: local-1/example-topic/3, local-2/example-topic/2
     reader 4: local-0/example-topic/0, local-2/example-topic/3
     reader 5: local-0/example-topic/1, local-3/example-topic/0
     reader 6: local-0/example-topic/2, local-3/example-topic/1
   
     After remove local-2 (/tmp/streamflink-dynamic-global-p7-
     rescale2.after_remove.summary)
   
     reader 0: local-0/example-topic/3, local-1/example-topic/0, 
local-3/example-
     topic/2
     reader 1: local-1/example-topic/1, local-3/example-topic/3
     reader 2: local-1/example-topic/2
     reader 3: local-1/example-topic/3
     reader 4: local-0/example-topic/0
     reader 5: local-0/example-topic/1, local-3/example-topic/0
     reader 6: local-0/example-topic/2, local-3/example-topic/1
   
     After readd local-2 (/tmp/streamflink-dynamic-global-p7-
     rescale2.after_readd.summary)
   
     reader 0: local-0/example-topic/3, local-1/example-topic/0, 
local-2/example-
     topic/2, local-3/example-topic/2
     reader 1: local-1/example-topic/1, local-2/example-topic/3, 
local-3/example-
     topic/3
     reader 2: local-1/example-topic/2
     reader 3: local-1/example-topic/3
     reader 4: local-0/example-topic/0
     reader 5: local-0/example-topic/1, local-2/example-topic/0, 
local-3/example-
     topic/0
     reader 6: local-0/example-topic/2, local-2/example-topic/1, 
local-3/example-
     topic/1
   
     After rescale to p=5 (/tmp/streamflink-dynamic-global-p7-rescale2.summary)
   
     reader 0: local-0/example-topic/3, local-1/example-topic/0, 
local-2/example-
     topic/2, local-3/example-topic/2
     reader 1: local-1/example-topic/1, local-2/example-topic/3, 
local-3/example-
     topic/3
     reader 2: local-0/example-topic/0, local-1/example-topic/2, 
local-1/example-
     topic/3
     reader 3: local-0/example-topic/1, local-2/example-topic/0, 
local-3/example-
     topic/0
     reader 4: local-0/example-topic/2, local-2/example-topic/1, 
local-3/example-
     topic/1
   
   ```
   On the other side, if it is per-cluster mode, the assignment would always be 
like
   ```
   reader 2: local-0/example-topic/0, local-1/example-topic/0, 
local-2/example-topic/0
   reader 3: local-0/example-topic/1, local-1/example-topic/1, 
local-2/example-topic/1
   reader 4: local-0/example-topic/2, local-1/example-topic/2, 
local-2/example-topic/2
   reader 5: local-0/example-topic/3, local-1/example-topic/3, 
local-2/example-topic/3
   ```
   and other readers idle.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to