This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new da1a90ff [FLINK-39012]Add global kafka enumerator support (#223)
da1a90ff is described below

commit da1a90ff3a8fd09f30b49015e9a35c8c00e2c8e1
Author: lnbest0707 <[email protected]>
AuthorDate: Wed Feb 11 10:04:28 2026 -0800

    [FLINK-39012]Add global kafka enumerator support (#223)
    
      Background
      ----------
      DynamicKafkaSource currently composes one KafkaSourceEnumerator per Kafka 
cluster.
      In default (per_cluster) mode, each sub-enumerator assigns partition 
splits independently using the
      default KafkaSource owner function (topic-hash + partition). That gives 
per-cluster
      balance, but can produce global skew when multiple clusters are active 
because each
      cluster balances against the full reader set in isolation.
    
      This change introduces an optional `GLOBAL` assignment mode while keeping 
`PER_CLUSTER` as the
      default for backward compatibility.
    
      Checkpoint model (what is stored where)
      ---------------------------------------
      JobManager / Enumerator checkpoint state:
      - DynamicKafkaSourceEnumState:
        - latest Kafka stream metadata (kafkaStreams), i.e. discovered 
streams/clusters/topics
          and cluster metadata used to rebuild sub-enumerators.
        - per-cluster KafkaSourceEnumState (clusterEnumeratorStates).
      - KafkaSourceEnumState stores:
        - split inventory with assignment status (ASSIGNED / UNASSIGNED) as
          SplitAndAssignmentStatus(KafkaPartitionSplit, status).
        - initialDiscoveryFinished flag.
      - (even before this change) **enumerator checkpoint does NOT persist 
split->reader owner mapping**.
    
      TaskManager / Reader checkpoint state:
      - DynamicKafkaSourceReader snapshots List<DynamicKafkaSourceSplit> from 
all sub-readers.
      - Each KafkaPartitionSplit snapshot carries the current consumed offset 
as split start
        (via KafkaPartitionSplitState -> KafkaPartitionSplit), so record 
progress is restored
        exactly.
      - Pending splits (before metadata activation) are also preserved.
      - **Reader knows which splits to consume from the checkpoint**
    
      Global mode design
      ------------------
      New source option:
      - stream-enumerator-mode = `per-cluster` | `global`
      - default remains per-cluster.
    
      Global mode behavior:
      - A single `GlobalSplitOwnerAssigner` is shared by all sub-enumerators.
      - For each cluster sub-enumerator, DynamicKafkaSourceEnumerator injects a
        `SplitOwnerSelector` callback into KafkaSourceEnumerator.
      - KafkaSourceEnumerator still owns discovery + pending assignment 
mechanics, but owner
        choice is delegated to the global selector.
    
      Global owner selection:
      - Split IDs are normalized as "<clusterId>-<partitionSplitId>" to make 
ownership global.
      - On new split assignment, owner is chosen by global round-robin cursor 
derived from
        currently known active split count (size % parallelism), then split 
becomes known.
      - On `addSplitsBack`, returned splits record a preferred owner hint; if 
still valid under
        current parallelism it is honored once, otherwise fallback to 
round-robin.
    
      Interaction with sub-enumerators
      --------------------------------
      No sub-enumerator lifecycle changes are made:
      - Sub-enumerators still discover partitions asynchronously and build
        pendingPartitionSplitAssignment.
      - The only behavioral change is owner computation (global selector vs 
topic name hash).
      - Existing assignPendingPartitionSplits/addReader/addSplitsBack paths are 
reused.
      - This preserves KafkaSourceEnumerator invariants while enabling 
cross-cluster balance.
    
      Recovery and rescaling semantics
      --------------------------------
      1) Normal checkpoint recovery (same parallelism)
      - Readers restore split state (with offsets) from operator state.
      - Enumerators restore split inventory/status from 
DynamicKafkaSourceEnumState.
      - Because owner IDs are not checkpointed, enumerators do not conflict 
with restored
        reader ownership; they only assign truly unassigned/new/splits-back 
work.
    
      2) Recovery after metadata changes
      - Enumerator snapshots current sub-enumerator state, rebuilds only active 
clusters/topics,
        filters stale topics, and restores from filtered state.
      - Global strategy reseeds active split IDs from restored state, so new 
assignments remain
        globally balanced without duplicating already-restored splits.
      - As **it only assign new/pending split, it would not require a full 
rebalance on all splits**.
    
      3) Rescale / autoscaling (parallelism change)
      - Flink repartitions reader operator state to new subtasks before readers 
start; split
        ownership may move across reader indices as part of standard 
operator-state restore.
      - Enumerator state remains valid because it tracks split status 
(ASSGINED/UNASSIGNED), not reader IDs.
      - New assignments after restore use current parallelism via the global 
selector.
      - Splits-back preferred owner is applied only if the subtask id still 
exists; otherwise
        fallback is deterministic round-robin under new parallelism.
    
      Determinism: what changed and why it matters
      --------------------------------------------
      Default(per_cluster) owner mapping is a pure function of 
topic/partition/parallelism, so a split’s
      owner is highly predictable.
    
      Global mode is intentionally not fully deterministic across runs, it is 
**forward looking global balancing**:
      - Exact owner can depend on discovery/event ordering across clusters and 
metadata updates.
      - Async discovery timing and rescale/failover timing can alter assignment 
order.
      - **Splits increase would not trigger assignment re-distribution and it 
remains global balancing.**
      - **Splits decrease would not trigger assignment re-distribution and the 
global balancing is no longer guaranteed. The "holes" of the removed splits 
remains until the next repartition, e.g. by parallelism change from 
auto-scaling.**
    
      What remains guaranteed:
      - No duplicate assignment of active splits.
      - Correct recovery from checkpoints.
      - Global balancing objective for newly assigned work (observed skew 
bounded in tests).
      - Optional split-back affinity when valid.
    
      Why this works:
      - Data correctness is anchored by reader split state (offset progress) + 
enumerator split
        inventory/status.
      - Though, we no longer have the fully deterministic assignment, stable 
owner identity is not required for correctness; it is a scheduling choice.
    
      Trade-off:
      - We trade strict owner predictability for better cross-cluster balancing 
and lower global
        skew in multi-cluster dynamic deployments.
      - We don't guarantee deterministic global assignment order to avoid full 
rebalancing after stream metadata change.
    
    
      Tests
      -----
      - Added/updated unit/integration coverage for:
        - global balancing across clusters,
        - cluster/topic expansion handling,
        - cluster/topic removal handling
        - recovery + repartition behavior under global mode,
        - global owner assigner behavior (round-robin, split-back affinity, 
out-of-range fallback).
---
 .../docs/connectors/datastream/dynamic-kafka.md    |  65 +-
 .../docs/connectors/datastream/dynamic-kafka.md    |  51 ++
 .../kafka/dynamic/source/DynamicKafkaSource.java   |   8 +-
 .../dynamic/source/DynamicKafkaSourceBuilder.java  |  15 +
 .../dynamic/source/DynamicKafkaSourceOptions.java  |  53 +-
 .../enumerator/DynamicKafkaSourceEnumerator.java   | 116 +++-
 .../enumerator/GlobalSplitOwnerAssigner.java       |  63 ++
 .../source/enumerator/KafkaSourceEnumerator.java   |  58 +-
 .../kafka/table/DynamicKafkaTableFactory.java      |   8 +
 .../dynamic/source/DynamicKafkaSourceITTest.java   |  94 +++
 .../DynamicKafkaSourceEnumeratorTest.java          | 673 +++++++++++++++++++++
 .../enumerator/GlobalSplitOwnerAssignerTest.java   | 203 +++++++
 12 files changed, 1389 insertions(+), 18 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md 
b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
index 2dc9d8ad..f5b2737f 100644
--- a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
@@ -60,14 +60,15 @@ corresponding to "input-stream".
 ```java
 
 DynamicKafkaSource<String> source = DynamicKafkaSource.<String>builder()
-        .setKafkaMetadataService(new MyKafkaMetadataService())
-        .setStreamIds(Collections.singleton("input-stream"))
-        .setStartingOffsets(OffsetsInitializer.earliest())
-        
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
-        .setProperties(properties)
-        .build();
-
-        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic 
Kafka Source");
+    .setKafkaMetadataService(new MyKafkaMetadataService())
+    .setStreamIds(Collections.singleton("input-stream"))
+    .setEnumeratorMode(DynamicKafkaSourceOptions.EnumeratorMode.PER_CLUSTER)
+    .setStartingOffsets(OffsetsInitializer.earliest())
+    
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+    .setProperties(properties)
+    .build();
+
+env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic Kafka 
Source");
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
@@ -169,6 +170,47 @@ The Dynamic Kafka Source provides 2 ways of subscribing to 
Kafka stream(s).
   {{< /tab >}}
   {{< /tabs >}}
 
+### Split 分配模式
+
+Dynamic Kafka Source 支持两种 split 分配模式:
+
+* `per_cluster`(默认):在每个 Kafka 集群内独立进行 split 分配。
+* `global`:在所有已发现的 Kafka 集群范围内统一做全局负载均衡分配。
+
+你可以通过 builder API 或 source properties 来配置该模式。
+
+新发现 split 的 owner 计算方式如下:
+
+* `per_cluster`:使用 `KafkaSourceEnumerator` 相同的 owner 逻辑。
+  对于 topic 分区 `(topic, partition)`,令 `numReaders = P`:
+  `startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % P`,
+  `owner = (startIndex + partition) % P`。
+* `global`:在所有集群共享一个全局 owner 游标:
+  `owner = knownActiveSplitIds.size() % numReaders`,
+  然后将该 split id 加入 `knownActiveSplitIds`。
+  (当 split 通过 `addSplitsBack` 返回时,若原 owner 仍然有效,则优先复用该 owner。)
+
+在 `global` 模式下,均衡策略是**前向增量(forward-only)**的:新发现 split 会尽量保证后续分配均衡,
+但不会仅为重平衡主动迁移已分配且仍在消费的 active split。
+
+如果因为缩容/移除导致 global 分配出现倾斜,enumerator 不会自行重排已在运行的 split。
+如需对已有 ownership 做重平衡,可通过并行度变化后的恢复流程(例如 savepoint/checkpoint + rescale restore),
+让 Flink Runtime 对 source reader 的 operator state 进行重分区。
+
+{{< tabs "DynamicKafkaSourceEnumeratorMode" >}}
+{{< tab "Java" >}}
+```java
+DynamicKafkaSource<String> source =
+    DynamicKafkaSource.<String>builder()
+        .setKafkaMetadataService(new MyKafkaMetadataService())
+        .setStreamIds(Set.of("input-stream"))
+        .setEnumeratorMode(DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL)
+        
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 ### Kafka Metadata Service
 
 An interface is provided to resolve the logical Kafka stream(s) into the 
corresponding physical
@@ -210,6 +252,13 @@ There are configuration options in 
DynamicKafkaSourceOptions that can be configu
       <td>Integer</td>
       <td>The number of consecutive failures before letting the exception from 
Kafka metadata service discovery trigger jobmanager failure and global 
failover. The default is one to at least catch startup failures.</td>
     </tr>
+    <tr>
+      <td><h5>stream-enumerator-mode</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">per_cluster</td>
+      <td>String</td>
+      <td>Dynamic Kafka split 分配所使用的 Enumerator 实现。支持 
<code>per_cluster</code>(集群内独立分配)和 <code>global</code>(跨集群全局均衡分配)。</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content/docs/connectors/datastream/dynamic-kafka.md 
b/docs/content/docs/connectors/datastream/dynamic-kafka.md
index 2d5c73f2..510a4576 100644
--- a/docs/content/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content/docs/connectors/datastream/dynamic-kafka.md
@@ -62,6 +62,7 @@ corresponding to "input-stream".
 DynamicKafkaSource<String> source = DynamicKafkaSource.<String>builder()
     .setKafkaMetadataService(new MyKafkaMetadataService())
     .setStreamIds(Collections.singleton("input-stream"))
+    .setEnumeratorMode(DynamicKafkaSourceOptions.EnumeratorMode.PER_CLUSTER)
     .setStartingOffsets(OffsetsInitializer.earliest())
     
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
     .setProperties(properties)
@@ -143,6 +144,49 @@ DynamicKafkaSource<String> source =
 {{< /tab >}}
 {{< /tabs >}}
 
+### Split Assignment Mode
+
+Dynamic Kafka Source supports two split-assignment modes:
+
+* `per_cluster` (default): assigns splits within each Kafka cluster 
independently.
+* `global`: assigns splits across all discovered clusters using one global 
balancing strategy.
+
+You can configure the mode either with the builder API or with source 
properties.
+
+How next owner is chosen for a newly discovered split:
+
+* `per_cluster`: uses the same owner logic as `KafkaSourceEnumerator`.
+  For topic partition `(topic, partition)`, with `numReaders = P`:
+  `startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % P`,
+  `owner = (startIndex + partition) % P`.
+* `global`: uses one global owner cursor across all clusters:
+  `owner = knownActiveSplitIds.size() % numReaders`,
+  then adds the split id into `knownActiveSplitIds`.
+  (When a split is returned via `addSplitsBack`, the preferred previous owner 
is reused when valid.)
+
+In `global` mode, balancing is **forward-looking**: newly discovered splits 
are assigned to keep
+future distribution balanced, while already assigned active splits are not 
proactively migrated only
+for rebalancing.
+
+If global assignment becomes skewed due to shrink/removal, the enumerator does 
not rebalance already
+active splits by itself. To rebalance existing ownership, use a restore with 
parallelism change
+(for example, savepoint/checkpoint restore after rescale), so Flink runtime 
repartitions source
+reader operator state.
+
+{{< tabs "DynamicKafkaSourceEnumeratorMode" >}}
+{{< tab "Java" >}}
+```java
+DynamicKafkaSource<String> source =
+    DynamicKafkaSource.<String>builder()
+        .setKafkaMetadataService(new MyKafkaMetadataService())
+        .setStreamIds(Set.of("input-stream"))
+        .setEnumeratorMode(DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL)
+        
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 ### Kafka Stream Subscription
 The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
 * A set of Kafka stream ids. For example:
@@ -214,6 +258,13 @@ There are configuration options in 
DynamicKafkaSourceOptions that can be configu
       <td>Integer</td>
       <td>The number of consecutive failures before letting the exception from 
Kafka metadata service discovery trigger jobmanager failure and global 
failover. The default is one to at least catch startup failures.</td>
     </tr>
+    <tr>
+      <td><h5>stream-enumerator-mode</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">per_cluster</td>
+      <td>String</td>
+      <td>Enumerator implementation for dynamic Kafka split assignment. 
Supported values are <code>per_cluster</code> (cluster-local assignment) and 
<code>global</code> (globally balanced assignment across clusters).</td>
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java
index 9a93d7b8..18636c44 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java
@@ -138,10 +138,10 @@ public class DynamicKafkaSource<T>
     }
 
     /**
-     * Create the {@link DynamicKafkaSourceEnumerator}.
+     * Create the configured split enumerator implementation.
      *
      * @param enumContext The {@link SplitEnumeratorContext context} for the 
split enumerator.
-     * @return the {@link DynamicKafkaSourceEnumerator}.
+     * @return the split enumerator.
      */
     @Internal
     @Override
@@ -159,12 +159,12 @@ public class DynamicKafkaSource<T>
     }
 
     /**
-     * Restore the {@link DynamicKafkaSourceEnumerator}.
+     * Restore the configured split enumerator implementation.
      *
      * @param enumContext The {@link SplitEnumeratorContext context} for the 
restored split
      *     enumerator.
      * @param checkpoint The checkpoint to restore the SplitEnumerator from.
-     * @return the {@link DynamicKafkaSourceEnumerator}.
+     * @return the split enumerator.
      */
     @Internal
     @Override
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
index 8e814afc..9b7c31ba 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
@@ -37,6 +37,7 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Locale;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -180,6 +181,20 @@ public class DynamicKafkaSourceBuilder<T> {
         return this;
     }
 
+    /**
+     * Set the enumerator mode used for split assignment.
+     *
+     * @param enumeratorMode split assignment mode.
+     * @return the builder.
+     */
+    public DynamicKafkaSourceBuilder<T> setEnumeratorMode(
+            DynamicKafkaSourceOptions.EnumeratorMode enumeratorMode) {
+        Preconditions.checkNotNull(enumeratorMode);
+        return setProperty(
+                DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+                enumeratorMode.name().toLowerCase(Locale.ROOT));
+    }
+
     /**
      * Set the property for {@link CommonClientConfigs#GROUP_ID_CONFIG}. This 
will be applied to all
      * clusters.
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
index bdecaf39..56c6d201 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
+import java.util.Locale;
 import java.util.Properties;
 import java.util.function.Function;
 
@@ -34,6 +35,16 @@ public class DynamicKafkaSourceOptions {
 
     private DynamicKafkaSourceOptions() {}
 
+    /**
+     * Enumerator mode determines how discovered Kafka splits are assigned to 
source readers:
+     * cluster-local assignment (per-cluster behavior) or globally balanced 
assignment across
+     * clusters.
+     */
+    public enum EnumeratorMode {
+        PER_CLUSTER,
+        GLOBAL
+    }
+
     public static final ConfigOption<Long> 
STREAM_METADATA_DISCOVERY_INTERVAL_MS =
             ConfigOptions.key("stream-metadata-discovery-interval-ms")
                     .longType()
@@ -51,10 +62,50 @@ public class DynamicKafkaSourceOptions {
                                     + "trigger jobmanager failure and global 
failover. The default is one to at least catch startup "
                                     + "failures.");
 
+    public static final ConfigOption<String> STREAM_ENUMERATOR_MODE =
+            ConfigOptions.key("stream-enumerator-mode")
+                    .stringType()
+                    
.defaultValue(EnumeratorMode.PER_CLUSTER.name().toLowerCase(Locale.ROOT))
+                    .withDescription(
+                            "Enumerator implementation for dynamic Kafka split 
assignment. "
+                                    + "'per_cluster' keeps per-cluster 
assignment behavior, while "
+                                    + "'global' enables global load-balanced 
assignment across clusters.");
+
     @Internal
     public static <T> T getOption(
             Properties props, ConfigOption<?> configOption, Function<String, 
T> parser) {
         String value = props.getProperty(configOption.key());
-        return (T) (value == null ? configOption.defaultValue() : 
parser.apply(value));
+        if (value != null) {
+            return parser.apply(value);
+        }
+
+        Object defaultValue = configOption.defaultValue();
+        return defaultValue == null ? null : 
parser.apply(String.valueOf(defaultValue));
+    }
+
+    @Internal
+    public static EnumeratorMode getEnumeratorMode(Properties props) {
+        return getOption(
+                props,
+                STREAM_ENUMERATOR_MODE,
+                value -> {
+                    final String normalizedValue =
+                            value.trim().toUpperCase(Locale.ROOT).replace('-', 
'_');
+                    try {
+                        return EnumeratorMode.valueOf(normalizedValue);
+                    } catch (IllegalArgumentException e) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "Invalid %s='%s'. Supported values 
are: %s.",
+                                        STREAM_ENUMERATOR_MODE.key(),
+                                        value,
+                                        
EnumeratorMode.PER_CLUSTER.name().toLowerCase(Locale.ROOT)
+                                                + ", "
+                                                + EnumeratorMode.GLOBAL
+                                                        .name()
+                                                        
.toLowerCase(Locale.ROOT)),
+                                e);
+                    }
+                });
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
index c118b27f..73c0a143 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
@@ -77,6 +77,7 @@ public class DynamicKafkaSourceEnumerator
 
     // The mapping that the split enumerator context needs to be able to 
forward certain requests.
     private final Map<String, StoppableKafkaEnumContextProxy> 
clusterEnumContextMap;
+    private final SplitAssignmentStrategy splitAssignmentStrategy;
     private final KafkaStreamSubscriber kafkaStreamSubscriber;
     private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
     private final KafkaMetadataService kafkaMetadataService;
@@ -155,6 +156,7 @@ public class DynamicKafkaSourceEnumerator
 
         this.kafkaMetadataService = kafkaMetadataService;
         this.stoppableKafkaEnumContextProxyFactory = 
stoppableKafkaEnumContextProxyFactory;
+        this.splitAssignmentStrategy = 
createSplitAssignmentStrategy(properties);
 
         // handle checkpoint state and rebuild contexts
         this.clusterEnumeratorMap = new HashMap<>();
@@ -181,6 +183,7 @@ public class DynamicKafkaSourceEnumerator
         }
 
         this.latestClusterTopicsMap = new HashMap<>();
+        Set<String> activeSplitIds = new HashSet<>();
         for (Entry<String, KafkaSourceEnumState> clusterEnumState :
                 
dynamicKafkaSourceEnumState.getClusterEnumeratorStates().entrySet()) {
             this.latestClusterTopicsMap.put(
@@ -188,6 +191,15 @@ public class DynamicKafkaSourceEnumerator
                     clusterEnumState.getValue().assignedSplits().stream()
                             .map(KafkaPartitionSplit::getTopic)
                             .collect(Collectors.toSet()));
+            clusterEnumState
+                    .getValue()
+                    .splits()
+                    .forEach(
+                            splitStatus ->
+                                    activeSplitIds.add(
+                                            toDynamicSplitId(
+                                                    clusterEnumState.getKey(),
+                                                    splitStatus.split())));
 
             createEnumeratorWithAssignedTopicPartitions(
                     clusterEnumState.getKey(),
@@ -197,6 +209,7 @@ public class DynamicKafkaSourceEnumerator
                     clusterStartingOffsets.get(clusterEnumState.getKey()),
                     clusterStoppingOffsets.get(clusterEnumState.getKey()));
         }
+        splitAssignmentStrategy.onMetadataRefresh(activeSplitIds);
     }
 
     /**
@@ -303,6 +316,7 @@ public class DynamicKafkaSourceEnumerator
         sendMetadataUpdateEventToAvailableReaders();
 
         // create enumerators
+        Set<String> activeSplitIds = new HashSet<>();
         for (Entry<String, Set<String>> activeClusterTopics : 
latestClusterTopicsMap.entrySet()) {
             KafkaSourceEnumState kafkaSourceEnumState =
                     dynamicKafkaSourceEnumState
@@ -318,6 +332,12 @@ public class DynamicKafkaSourceEnumerator
                         kafkaSourceEnumState.splits().stream()
                                 .filter(tp -> 
activeTopics.contains(tp.split().getTopic()))
                                 .collect(Collectors.toSet());
+                partitions.forEach(
+                        splitStatus ->
+                                activeSplitIds.add(
+                                        toDynamicSplitId(
+                                                activeClusterTopics.getKey(),
+                                                splitStatus.split())));
 
                 newKafkaSourceEnumState =
                         new KafkaSourceEnumState(
@@ -337,6 +357,7 @@ public class DynamicKafkaSourceEnumerator
                     clusterStoppingOffsets.get(activeClusterTopics.getKey()));
         }
 
+        splitAssignmentStrategy.onMetadataRefresh(activeSplitIds);
         startAllEnumerators();
     }
 
@@ -407,6 +428,10 @@ public class DynamicKafkaSourceEnumerator
                         kafkaClusterId,
                         kafkaMetadataService,
                         signalNoMoreSplitsCallback);
+        KafkaSourceEnumerator.SplitOwnerSelector splitOwnerSelector =
+                
splitAssignmentStrategy.createSplitOwnerSelector(kafkaClusterId);
+        SplitEnumeratorContext<KafkaPartitionSplit> assignmentContext =
+                
splitAssignmentStrategy.createAssignmentContext(kafkaClusterId, context);
 
         Properties consumerProps = new Properties();
         KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps);
@@ -425,9 +450,10 @@ public class DynamicKafkaSourceEnumerator
                         effectiveStartingOffsetsInitializer,
                         effectiveStoppingOffsetsInitializer,
                         consumerProps,
-                        context,
+                        assignmentContext,
                         boundedness,
-                        kafkaSourceEnumState);
+                        kafkaSourceEnumState,
+                        splitOwnerSelector);
 
         clusterEnumContextMap.put(kafkaClusterId, context);
         clusterEnumeratorMap.put(kafkaClusterId, enumerator);
@@ -489,6 +515,8 @@ public class DynamicKafkaSourceEnumerator
     @Override
     public void addSplitsBack(List<DynamicKafkaSourceSplit> splits, int 
subtaskId) {
         logger.debug("Adding splits back for {}", subtaskId);
+        splitAssignmentStrategy.onSplitsBack(splits, subtaskId);
+
         // separate splits by cluster
         Map<String, List<KafkaPartitionSplit>> kafkaPartitionSplits = new 
HashMap<>();
         for (DynamicKafkaSourceSplit split : splits) {
@@ -518,6 +546,8 @@ public class DynamicKafkaSourceEnumerator
     @Override
     public void addReader(int subtaskId) {
         logger.debug("Adding reader {}", subtaskId);
+        splitAssignmentStrategy.onReaderAdded(subtaskId);
+
         // assign pending splits from the sub enumerator
         clusterEnumeratorMap.forEach(
                 (cluster, subEnumerator) -> 
subEnumerator.addReader(subtaskId));
@@ -579,4 +609,86 @@ public class DynamicKafkaSourceEnumerator
             throw new RuntimeException(e);
         }
     }
+
+    private SplitAssignmentStrategy createSplitAssignmentStrategy(Properties 
properties) {
+        DynamicKafkaSourceOptions.EnumeratorMode enumeratorMode =
+                DynamicKafkaSourceOptions.getEnumeratorMode(properties);
+        logger.info("Using dynamic Kafka split enumerator mode: {}", 
enumeratorMode);
+
+        switch (enumeratorMode) {
+            case GLOBAL:
+                return new GlobalSplitAssignmentStrategy();
+            case PER_CLUSTER:
+            default:
+                return new PerClusterSplitAssignmentStrategy();
+        }
+    }
+
+    private static String toDynamicSplitId(String kafkaClusterId, 
KafkaPartitionSplit split) {
+        return kafkaClusterId + "-" + split.splitId();
+    }
+
+    private interface SplitAssignmentStrategy {
+        @Nullable
+        default KafkaSourceEnumerator.SplitOwnerSelector 
createSplitOwnerSelector(
+                String kafkaClusterId) {
+            return null;
+        }
+
+        SplitEnumeratorContext<KafkaPartitionSplit> createAssignmentContext(
+                String kafkaClusterId, StoppableKafkaEnumContextProxy 
clusterContext);
+
+        default void onReaderAdded(int subtaskId) {}
+
+        default void onSplitsBack(List<DynamicKafkaSourceSplit> splits, int 
subtaskId) {}
+
+        default void onMetadataRefresh(Set<String> activeSplitIds) {}
+    }
+
+    private static class PerClusterSplitAssignmentStrategy implements 
SplitAssignmentStrategy {
+        @Override
+        public SplitEnumeratorContext<KafkaPartitionSplit> 
createAssignmentContext(
+                String kafkaClusterId, StoppableKafkaEnumContextProxy 
clusterContext) {
+            return clusterContext;
+        }
+    }
+
+    private static class GlobalSplitAssignmentStrategy implements 
SplitAssignmentStrategy {
+        private final GlobalSplitOwnerAssigner splitOwnerAssigner;
+
+        private GlobalSplitAssignmentStrategy() {
+            this.splitOwnerAssigner = new GlobalSplitOwnerAssigner();
+        }
+
+        @Override
+        public SplitEnumeratorContext<KafkaPartitionSplit> 
createAssignmentContext(
+                String kafkaClusterId, StoppableKafkaEnumContextProxy 
clusterContext) {
+            return clusterContext;
+        }
+
+        @Override
+        public KafkaSourceEnumerator.SplitOwnerSelector 
createSplitOwnerSelector(
+                String kafkaClusterId) {
+            return (split, numReaders) -> assignSplitOwner(kafkaClusterId, 
split, numReaders);
+        }
+
+        @Override
+        public void onReaderAdded(int subtaskId) {}
+
+        @Override
+        public void onSplitsBack(List<DynamicKafkaSourceSplit> splits, int 
subtaskId) {
+            splitOwnerAssigner.onSplitsBack(splits, subtaskId);
+        }
+
+        @Override
+        public void onMetadataRefresh(Set<String> activeSplitIds) {
+            splitOwnerAssigner.onMetadataRefresh(activeSplitIds);
+        }
+
+        private int assignSplitOwner(
+                String kafkaClusterId, KafkaPartitionSplit split, int 
numReaders) {
+            final String splitId = toDynamicSplitId(kafkaClusterId, split);
+            return splitOwnerAssigner.assignSplitOwner(splitId, numReaders);
+        }
+    }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssigner.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssigner.java
new file mode 100644
index 00000000..aa93a9a1
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssigner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.dynamic.source.enumerator;
+
+import 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Tracks globally known dynamic split ids and computes reader ownership using 
round-robin order.
+ */
+final class GlobalSplitOwnerAssigner {
+
+    private final Set<String> knownActiveSplitIds = new HashSet<>();
+    private final Map<String, Integer> preferredOwnerBySplitId = new 
HashMap<>();
+
+    void onMetadataRefresh(Set<String> activeSplitIds) {
+        knownActiveSplitIds.clear();
+        knownActiveSplitIds.addAll(activeSplitIds);
+        preferredOwnerBySplitId.keySet().retainAll(activeSplitIds);
+    }
+
+    void onSplitsBack(List<DynamicKafkaSourceSplit> splits, int subtaskId) {
+        for (DynamicKafkaSourceSplit split : splits) {
+            knownActiveSplitIds.add(split.splitId());
+            preferredOwnerBySplitId.put(split.splitId(), subtaskId);
+        }
+    }
+
+    int assignSplitOwner(String splitId, int numReaders) {
+        Preconditions.checkArgument(numReaders > 0, "numReaders must be > 0");
+
+        Integer preferredOwner = preferredOwnerBySplitId.remove(splitId);
+        if (preferredOwner != null && preferredOwner >= 0 && preferredOwner < 
numReaders) {
+            return preferredOwner;
+        }
+
+        int targetReader = Math.floorMod(knownActiveSplitIds.size(), 
numReaders);
+        knownActiveSplitIds.add(splitId);
+        return targetReader;
+    }
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 61c64e12..34523a5b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -46,6 +46,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -59,6 +60,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.flink.util.CollectionUtil.newLinkedHashMapWithExpectedSize;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The enumerator class for Kafka source.
@@ -94,6 +96,17 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class KafkaSourceEnumerator
         implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSourceEnumerator.class);
+
+    /**
+     * Selects the target reader for a split while it is enqueued in pending 
assignments.
+     *
+     * <p>The selector is invoked on the coordinator thread.
+     */
+    @FunctionalInterface
+    public interface SplitOwnerSelector {
+        int getSplitOwner(KafkaPartitionSplit split, int numReaders);
+    }
+
     private final KafkaSubscriber subscriber;
     private final OffsetsInitializer startingOffsetInitializer;
     private final OffsetsInitializer stoppingOffsetInitializer;
@@ -118,6 +131,8 @@ public class KafkaSourceEnumerator
     private final Map<Integer, Set<KafkaPartitionSplit>> 
pendingPartitionSplitAssignment =
             new HashMap<>();
 
+    private final SplitOwnerSelector splitOwnerSelector;
+
     /** The consumer group id used for this KafkaSource. */
     private final String consumerGroupId;
 
@@ -144,7 +159,8 @@ public class KafkaSourceEnumerator
                 properties,
                 context,
                 boundedness,
-                new KafkaSourceEnumState(Collections.emptySet(), false));
+                new KafkaSourceEnumState(Collections.emptySet(), false),
+                null);
     }
 
     public KafkaSourceEnumerator(
@@ -155,6 +171,26 @@ public class KafkaSourceEnumerator
             SplitEnumeratorContext<KafkaPartitionSplit> context,
             Boundedness boundedness,
             KafkaSourceEnumState kafkaSourceEnumState) {
+        this(
+                subscriber,
+                startingOffsetInitializer,
+                stoppingOffsetInitializer,
+                properties,
+                context,
+                boundedness,
+                kafkaSourceEnumState,
+                null);
+    }
+
+    public KafkaSourceEnumerator(
+            KafkaSubscriber subscriber,
+            OffsetsInitializer startingOffsetInitializer,
+            OffsetsInitializer stoppingOffsetInitializer,
+            Properties properties,
+            SplitEnumeratorContext<KafkaPartitionSplit> context,
+            Boundedness boundedness,
+            KafkaSourceEnumState kafkaSourceEnumState,
+            @Nullable SplitOwnerSelector splitOwnerSelector) {
         this.subscriber = subscriber;
         this.startingOffsetInitializer = startingOffsetInitializer;
         this.stoppingOffsetInitializer = stoppingOffsetInitializer;
@@ -162,6 +198,11 @@ public class KafkaSourceEnumerator
         this.properties = properties;
         this.context = context;
         this.boundedness = boundedness;
+        this.splitOwnerSelector =
+                splitOwnerSelector != null
+                        ? splitOwnerSelector
+                        : (split, numReaders) ->
+                                getSplitOwner(split.getTopicPartition(), 
numReaders);
 
         this.partitionDiscoveryIntervalMs =
                 KafkaSourceOptions.getOption(
@@ -403,8 +444,19 @@ public class KafkaSourceEnumerator
     private void addPartitionSplitChangeToPendingAssignments(
             Collection<KafkaPartitionSplit> newPartitionSplits) {
         int numReaders = context.currentParallelism();
-        for (KafkaPartitionSplit split : newPartitionSplits) {
-            int ownerReader = getSplitOwner(split.getTopicPartition(), 
numReaders);
+        List<KafkaPartitionSplit> sortedSplits = new 
ArrayList<>(newPartitionSplits);
+        sortedSplits.sort(
+                Comparator.comparing(
+                                (KafkaPartitionSplit split) -> 
split.getTopicPartition().topic())
+                        .thenComparingInt(split -> 
split.getTopicPartition().partition()));
+        for (KafkaPartitionSplit split : sortedSplits) {
+            int ownerReader = splitOwnerSelector.getSplitOwner(split, 
numReaders);
+            checkState(
+                    ownerReader >= 0 && ownerReader < numReaders,
+                    "Invalid split owner %s for split %s with current 
parallelism %s",
+                    ownerReader,
+                    split,
+                    numReaders);
             pendingPartitionSplitAssignment
                     .computeIfAbsent(ownerReader, r -> new HashSet<>())
                     .add(split);
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
index fea44744..99304838 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
@@ -122,6 +122,7 @@ public class DynamicKafkaTableFactory implements 
DynamicTableSourceFactory {
         options.add(SCAN_PARALLELISM);
         
options.add(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS);
         
options.add(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD);
+        options.add(DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE);
         return options;
     }
 
@@ -216,6 +217,13 @@ public class DynamicKafkaTableFactory implements 
DynamicTableSourceFactory {
                                                 
.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD
                                                 .key(),
                                         Integer.toString(value)));
+        tableOptions
+                .getOptional(DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE)
+                .ifPresent(
+                        value ->
+                                properties.setProperty(
+                                        
DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+                                        value));
     }
 
     private static Optional<DecodingFormat<DeserializationSchema<RowData>>> 
getKeyDecodingFormat(
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
index a5107c90..c95f2f9a 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.kafka.dynamic.source;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.configuration.Configuration;
@@ -84,6 +85,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -213,6 +215,58 @@ class DynamicKafkaSourceITTest {
                                     .collect(Collectors.toList()));
         }
 
+        @Test
+        void testGlobalEnumeratorModeBalancesAssignments() throws Throwable {
+            // This verifies the global mode wiring from DynamicKafkaSource 
builder -> enumerator.
+            // In global mode, split ownership should be balanced across all 
readers (not per
+            // cluster).
+            final int numSubtasks = 4;
+            Properties properties = new Properties();
+            
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
 "0");
+            properties.setProperty(
+                    
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "0");
+            properties.setProperty(
+                    DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+                    
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL.name().toLowerCase());
+
+            MockKafkaMetadataService metadataService =
+                    new MockKafkaMetadataService(
+                            Collections.singleton(
+                                    
DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC)));
+
+            DynamicKafkaSource<Integer> source =
+                    DynamicKafkaSource.<Integer>builder()
+                            .setStreamIds(
+                                    metadataService.getAllStreams().stream()
+                                            .map(KafkaStream::getStreamId)
+                                            .collect(Collectors.toSet()))
+                            .setKafkaMetadataService(metadataService)
+                            .setDeserializer(
+                                    KafkaRecordDeserializationSchema.valueOnly(
+                                            IntegerDeserializer.class))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setProperties(properties)
+                            .build();
+
+            try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+                            new MockSplitEnumeratorContext<>(numSubtasks);
+                    SplitEnumerator<DynamicKafkaSourceSplit, 
DynamicKafkaSourceEnumState>
+                            splitEnumerator = 
source.createEnumerator(context)) {
+                DynamicKafkaSourceEnumerator enumerator =
+                        (DynamicKafkaSourceEnumerator) splitEnumerator;
+                enumerator.start();
+
+                for (int readerId = 0; readerId < numSubtasks; readerId++) {
+                    registerReader(context, enumerator, readerId);
+                }
+                runAllOneTimeCallables(context);
+
+                verifyAllSplitsAssignedOnce(
+                        context.getSplitsAssignmentSequence(), 
metadataService.getAllStreams());
+                
assertAssignmentsBalanced(context.getSplitsAssignmentSequence(), numSubtasks);
+            }
+        }
+
         @Test
         void testSingleClusterTopicMetadataService() throws Exception {
             StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -1096,6 +1150,46 @@ class DynamicKafkaSourceITTest {
             }
         }
 
+        private void verifyAllSplitsAssignedOnce(
+                List<SplitsAssignment<DynamicKafkaSourceSplit>> assignments,
+                Set<KafkaStream> kafkaStreams) {
+            Map<String, Integer> assignmentFrequency = new HashMap<>();
+            for (SplitsAssignment<DynamicKafkaSourceSplit> step : assignments) 
{
+                for (List<DynamicKafkaSourceSplit> splits : 
step.assignment().values()) {
+                    for (DynamicKafkaSourceSplit split : splits) {
+                        assignmentFrequency.merge(split.splitId(), 1, 
Integer::sum);
+                    }
+                }
+            }
+
+            int expectedSplits =
+                    kafkaStreams.stream()
+                            .flatMap(stream -> 
stream.getClusterMetadataMap().entrySet().stream())
+                            .mapToInt(entry -> 
entry.getValue().getTopics().size() * NUM_PARTITIONS)
+                            .sum();
+            assertThat(assignmentFrequency).hasSize(expectedSplits);
+            assertThat(assignmentFrequency.values()).allMatch(count -> count 
== 1);
+        }
+
+        private void assertAssignmentsBalanced(
+                List<SplitsAssignment<DynamicKafkaSourceSplit>> assignments, 
int numReaders) {
+            Map<Integer, Integer> assignedSplitCountByReader = new HashMap<>();
+            for (int readerId = 0; readerId < numReaders; readerId++) {
+                assignedSplitCountByReader.put(readerId, 0);
+            }
+            for (SplitsAssignment<DynamicKafkaSourceSplit> assignment : 
assignments) {
+                for (Map.Entry<Integer, List<DynamicKafkaSourceSplit>> entry :
+                        assignment.assignment().entrySet()) {
+                    assignedSplitCountByReader.merge(
+                            entry.getKey(), entry.getValue().size(), 
Integer::sum);
+                }
+            }
+
+            int minAssignedSplits = 
Collections.min(assignedSplitCountByReader.values());
+            int maxAssignedSplits = 
Collections.max(assignedSplitCountByReader.values());
+            assertThat(maxAssignedSplits - 
minAssignedSplits).isLessThanOrEqualTo(1);
+        }
+
         private Set<KafkaStream> getKafkaStreams(
                 String kafkaClusterId, Properties properties, 
Collection<String> topics) {
             return topics.stream()
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
index f974b6ff..21eb4399 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
@@ -33,10 +33,16 @@ import 
org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.Kaf
 import 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.connector.kafka.source.enumerator.AssignmentStatus;
+import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.connector.kafka.testutils.MockKafkaMetadataService;
 import org.apache.flink.mock.Whitebox;
+import 
org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import 
org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
 
 import com.google.common.collect.ImmutableSet;
@@ -47,6 +53,7 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -72,6 +79,7 @@ public class DynamicKafkaSourceEnumeratorTest {
     private static final String TOPIC = "DynamicKafkaSourceEnumeratorTest";
     private static final int NUM_SPLITS_PER_CLUSTER = 3;
     private static final int NUM_RECORDS_PER_SPLIT = 5;
+    private static final String SOURCE_READER_SPLIT_STATE_NAME = 
"source-reader-splits";
 
     @BeforeAll
     public static void beforeAll() throws Throwable {
@@ -688,6 +696,511 @@ public class DynamicKafkaSourceEnumeratorTest {
         }
     }
 
+    @Test
+    public void testGlobalEnumeratorModeBalancesSplitsAcrossClusters() throws 
Throwable {
+        final int numSubtasks = 2;
+        try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+                        new MockSplitEnumeratorContext<>(numSubtasks);
+                DynamicKafkaSourceEnumerator enumerator =
+                        createEnumerator(
+                                context,
+                                properties ->
+                                        properties.setProperty(
+                                                
DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE
+                                                        .key(),
+                                                
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL
+                                                        .name()
+                                                        .toLowerCase()))) {
+            enumerator.start();
+
+            for (int i = 0; i < numSubtasks; i++) {
+                mockRegisterReaderAndSendReaderStartupEvent(context, 
enumerator, i);
+            }
+
+            runAllOneTimeCallables(context);
+
+            verifyAllSplitsHaveBeenAssigned(
+                    context.getSplitsAssignmentSequence(),
+                    DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC));
+
+            Map<Integer, Integer> assignedSplitCountByReader = new HashMap<>();
+            for (int readerId = 0; readerId < numSubtasks; readerId++) {
+                assignedSplitCountByReader.put(readerId, 0);
+            }
+            for (SplitsAssignment<DynamicKafkaSourceSplit> assignment :
+                    context.getSplitsAssignmentSequence()) {
+                for (Entry<Integer, List<DynamicKafkaSourceSplit>> entry :
+                        assignment.assignment().entrySet()) {
+                    assignedSplitCountByReader.merge(
+                            entry.getKey(), entry.getValue().size(), 
Integer::sum);
+                }
+            }
+            int minAssignedSplits = 
Collections.min(assignedSplitCountByReader.values());
+            int maxAssignedSplits = 
Collections.max(assignedSplitCountByReader.values());
+            assertThat(maxAssignedSplits - minAssignedSplits)
+                    .as(
+                            "global mode should keep per-reader split counts 
balanced, current assigned split count is "
+                                    + assignedSplitCountByReader.values())
+                    .isLessThanOrEqualTo(1);
+        }
+    }
+
+    @Test
+    public void 
testGlobalModeBalancesAssignmentsWithClusterAndPartitionExpansion()
+            throws Throwable {
+        final int numSubtasks = 4;
+        final String expandedTopic = TOPIC + "_expanded";
+        final int expandedTopicPartitions = NUM_SPLITS_PER_CLUSTER * 2;
+        DynamicKafkaSourceTestHelper.createTopic(expandedTopic, 
expandedTopicPartitions, 1);
+        DynamicKafkaSourceTestHelper.produceToKafka(
+                expandedTopic, expandedTopicPartitions, NUM_RECORDS_PER_SPLIT);
+
+        KafkaStream initialStream =
+                new KafkaStream(
+                        TOPIC, 
DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC));
+
+        Map<String, ClusterMetadata> expandedClusterMetadata = new HashMap<>();
+        expandedClusterMetadata.putAll(
+                DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC, 
expandedTopic));
+        expandedClusterMetadata.putAll(
+                DynamicKafkaSourceTestHelper.getClusterMetadataMap(1, TOPIC, 
expandedTopic));
+        KafkaStream expandedStream = new KafkaStream(TOPIC, 
expandedClusterMetadata);
+
+        MockKafkaMetadataService metadataService =
+                new 
MockKafkaMetadataService(Collections.singleton(initialStream));
+        try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+                        new MockSplitEnumeratorContext<>(numSubtasks);
+                DynamicKafkaSourceEnumerator enumerator =
+                        createEnumerator(
+                                context,
+                                metadataService,
+                                properties -> {
+                                    properties.setProperty(
+                                            
DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+                                            
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL
+                                                    .name()
+                                                    .toLowerCase());
+                                    properties.setProperty(
+                                            DynamicKafkaSourceOptions
+                                                    
.STREAM_METADATA_DISCOVERY_INTERVAL_MS
+                                                    .key(),
+                                            "1");
+                                })) {
+            enumerator.start();
+            context.runPeriodicCallable(0);
+            runAllOneTimeCallables(context);
+
+            for (int i = 0; i < numSubtasks; i++) {
+                mockRegisterReaderAndSendReaderStartupEvent(context, 
enumerator, i);
+            }
+
+            
verifyAllSplitsHaveBeenAssigned(context.getSplitsAssignmentSequence(), 
initialStream);
+
+            int assignmentsBeforeUpdate = 
context.getSplitsAssignmentSequence().size();
+
+            
metadataService.setKafkaStreams(Collections.singleton(expandedStream));
+            context.runPeriodicCallable(0);
+            runAllOneTimeCallables(context);
+
+            Map<String, Map<String, Integer>> 
expectedPartitionsByClusterAndTopic = new HashMap<>();
+            Map<String, Integer> expectedTopicPartitions = new HashMap<>();
+            expectedTopicPartitions.put(TOPIC, NUM_SPLITS_PER_CLUSTER);
+            expectedTopicPartitions.put(expandedTopic, 
expandedTopicPartitions);
+            expectedPartitionsByClusterAndTopic.put(
+                    DynamicKafkaSourceTestHelper.getKafkaClusterId(0),
+                    new HashMap<>(expectedTopicPartitions));
+            expectedPartitionsByClusterAndTopic.put(
+                    DynamicKafkaSourceTestHelper.getKafkaClusterId(1),
+                    new HashMap<>(expectedTopicPartitions));
+            verifyExpectedTopicPartitions(
+                    context.getSplitsAssignmentSequence(), 
expectedPartitionsByClusterAndTopic);
+
+            List<SplitsAssignment<DynamicKafkaSourceSplit>> 
incrementalAssignments =
+                    context.getSplitsAssignmentSequence()
+                            .subList(
+                                    assignmentsBeforeUpdate,
+                                    
context.getSplitsAssignmentSequence().size());
+            assertThat(incrementalAssignments)
+                    .as("metadata expansion should produce new split 
assignments")
+                    .isNotEmpty();
+            assertAssignmentsBalanced(incrementalAssignments, numSubtasks);
+        }
+    }
+
+    @Test
+    public void testGlobalModeClusterAndTopicShrinkThenRescaleRecovery() 
throws Throwable {
+        final int initialParallelism = 8;
+        final int restoredParallelism = 8;
+        final String expandedTopic = TOPIC + "_shrink_expand";
+        final int expandedTopicPartitions = NUM_SPLITS_PER_CLUSTER + 1;
+
+        DynamicKafkaSourceTestHelper.createTopic(expandedTopic, 
expandedTopicPartitions, 1);
+        DynamicKafkaSourceTestHelper.produceToKafka(
+                expandedTopic, expandedTopicPartitions, NUM_RECORDS_PER_SPLIT);
+
+        KafkaStream shrunkStream =
+                new KafkaStream(
+                        TOPIC, 
DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC));
+
+        Map<String, ClusterMetadata> expandedClusterMetadata = new HashMap<>();
+        expandedClusterMetadata.putAll(
+                DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC, 
expandedTopic));
+        expandedClusterMetadata.putAll(
+                DynamicKafkaSourceTestHelper.getClusterMetadataMap(1, TOPIC, 
expandedTopic));
+        KafkaStream expandedStream = new KafkaStream(TOPIC, 
expandedClusterMetadata);
+
+        DynamicKafkaSourceEnumState shrunkCheckpoint;
+        MockKafkaMetadataService metadataService =
+                new 
MockKafkaMetadataService(Collections.singleton(expandedStream));
+        try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+                        new MockSplitEnumeratorContext<>(initialParallelism);
+                DynamicKafkaSourceEnumerator enumerator =
+                        createEnumerator(
+                                context,
+                                metadataService,
+                                properties -> {
+                                    properties.setProperty(
+                                            
DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+                                            
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL
+                                                    .name()
+                                                    .toLowerCase());
+                                    properties.setProperty(
+                                            DynamicKafkaSourceOptions
+                                                    
.STREAM_METADATA_DISCOVERY_INTERVAL_MS
+                                                    .key(),
+                                            "1");
+                                })) {
+            enumerator.start();
+            context.runPeriodicCallable(0);
+            runAllOneTimeCallables(context);
+
+            for (int i = 0; i < initialParallelism; i++) {
+                mockRegisterReaderAndSendReaderStartupEvent(context, 
enumerator, i);
+            }
+
+            Map<String, Map<String, Integer>> expandedExpectedPartitions = new 
HashMap<>();
+            Map<String, Integer> expandedExpectedTopicPartitions = new 
HashMap<>();
+            expandedExpectedTopicPartitions.put(TOPIC, NUM_SPLITS_PER_CLUSTER);
+            expandedExpectedTopicPartitions.put(expandedTopic, 
expandedTopicPartitions);
+            expandedExpectedPartitions.put(
+                    DynamicKafkaSourceTestHelper.getKafkaClusterId(0),
+                    new HashMap<>(expandedExpectedTopicPartitions));
+            expandedExpectedPartitions.put(
+                    DynamicKafkaSourceTestHelper.getKafkaClusterId(1),
+                    new HashMap<>(expandedExpectedTopicPartitions));
+            verifyExpectedTopicPartitions(
+                    context.getSplitsAssignmentSequence(), 
expandedExpectedPartitions);
+
+            Map<Integer, Set<String>> fullAssignmentsBeforeShrink =
+                    getReaderAssignmentsBySplitId(
+                            context.getSplitsAssignmentSequence(), 
initialParallelism);
+
+            int assignmentsBeforeShrink = 
context.getSplitsAssignmentSequence().size();
+            
metadataService.setKafkaStreams(Collections.singleton(shrunkStream));
+            context.runPeriodicCallable(0);
+            runAllOneTimeCallables(context);
+            assertThat(context.getSplitsAssignmentSequence().size())
+                    .as("metadata shrink should not create new split 
assignments")
+                    .isEqualTo(assignmentsBeforeShrink);
+
+            shrunkCheckpoint = enumerator.snapshotState(-1);
+            Map<String, KafkaSourceEnumState> shrunkClusterStates =
+                    shrunkCheckpoint.getClusterEnumeratorStates();
+            String cluster0 = 
DynamicKafkaSourceTestHelper.getKafkaClusterId(0);
+            String cluster1 = 
DynamicKafkaSourceTestHelper.getKafkaClusterId(1);
+            assertThat(shrunkClusterStates)
+                    .as("checkpoint should only retain the still-active 
cluster")
+                    .containsOnlyKeys(cluster0);
+            assertThat(shrunkClusterStates.get(cluster0).assignedSplits())
+                    .as("only active topic partitions should remain after 
shrink")
+                    .hasSize(NUM_SPLITS_PER_CLUSTER)
+                    .allSatisfy(
+                            split ->
+                                    assertThat(split.getTopic())
+                                            .as("removed topics should not 
remain in checkpoint")
+                                            .isEqualTo(TOPIC));
+            assertThat(shrunkClusterStates).doesNotContainKey(cluster1);
+
+            Set<String> activeSplitIdsAfterShrink =
+                    shrunkClusterStates.get(cluster0).assignedSplits().stream()
+                            .map(split -> dynamicSplitId(cluster0, split))
+                            .collect(Collectors.toSet());
+            Map<Integer, Set<String>> activeAssignmentsAfterShrink =
+                    retainOnlyActiveSplits(fullAssignmentsBeforeShrink, 
activeSplitIdsAfterShrink);
+            long idleReadersAfterShrink =
+                    
activeAssignmentsAfterShrink.values().stream().filter(Set::isEmpty).count();
+            assertThat(idleReadersAfterShrink)
+                    .as("shrink should leave reader holes without rebalancing 
existing assignments")
+                    .isGreaterThanOrEqualTo(initialParallelism - 
NUM_SPLITS_PER_CLUSTER);
+        }
+
+        MockKafkaMetadataService restoredMetadataService =
+                new 
MockKafkaMetadataService(Collections.singleton(shrunkStream));
+        Properties properties = new Properties();
+        
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
 "0");
+        properties.setProperty(
+                
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "1");
+        properties.setProperty(
+                DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+                
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL.name().toLowerCase());
+
+        try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> 
restoredContext =
+                        new MockSplitEnumeratorContext<>(restoredParallelism);
+                DynamicKafkaSourceEnumerator restoredEnumerator =
+                        new DynamicKafkaSourceEnumerator(
+                                new 
KafkaStreamSetSubscriber(Collections.singleton(TOPIC)),
+                                restoredMetadataService,
+                                restoredContext,
+                                OffsetsInitializer.earliest(),
+                                new NoStoppingOffsetsInitializer(),
+                                properties,
+                                Boundedness.CONTINUOUS_UNBOUNDED,
+                                shrunkCheckpoint,
+                                new TestKafkaEnumContextProxyFactory())) {
+            restoredEnumerator.start();
+            for (int i = 0; i < restoredParallelism; i++) {
+                mockRegisterReaderAndSendReaderStartupEvent(restoredContext, 
restoredEnumerator, i);
+            }
+
+            restoredContext.runPeriodicCallable(0);
+            runAllOneTimeCallables(restoredContext);
+            int assignmentsBeforeRegrowth = 
restoredContext.getSplitsAssignmentSequence().size();
+
+            
restoredMetadataService.setKafkaStreams(Collections.singleton(expandedStream));
+            restoredContext.runPeriodicCallable(0);
+            runAllOneTimeCallables(restoredContext);
+
+            List<SplitsAssignment<DynamicKafkaSourceSplit>> 
incrementalAssignments =
+                    restoredContext
+                            .getSplitsAssignmentSequence()
+                            .subList(
+                                    assignmentsBeforeRegrowth,
+                                    
restoredContext.getSplitsAssignmentSequence().size());
+            assertThat(incrementalAssignments)
+                    .as("regrowth should assign only newly reintroduced 
cluster/topic splits")
+                    .isNotEmpty();
+            assertAssignmentsBalanced(incrementalAssignments, 
restoredParallelism);
+
+            DynamicKafkaSourceEnumState restoredSnapshot = 
restoredEnumerator.snapshotState(-1);
+            String cluster0 = 
DynamicKafkaSourceTestHelper.getKafkaClusterId(0);
+            String cluster1 = 
DynamicKafkaSourceTestHelper.getKafkaClusterId(1);
+            assertThat(restoredSnapshot.getClusterEnumeratorStates())
+                    .as("regrowth should restore both clusters in enumerator 
state")
+                    .containsKeys(cluster0, cluster1);
+
+            for (String clusterId : ImmutableSet.of(cluster0, cluster1)) {
+                KafkaSourceEnumState clusterState =
+                        
restoredSnapshot.getClusterEnumeratorStates().get(clusterId);
+                assertThat(clusterState.assignedSplits())
+                        .as("each cluster should track expected assignments 
after regrowth")
+                        .hasSize(NUM_SPLITS_PER_CLUSTER + 
expandedTopicPartitions);
+
+                Set<TopicPartition> assignedTopicPartitions =
+                        clusterState.assignedSplits().stream()
+                                .map(KafkaPartitionSplit::getTopicPartition)
+                                .collect(Collectors.toSet());
+                for (int partition = 0; partition < NUM_SPLITS_PER_CLUSTER; 
partition++) {
+                    assertThat(assignedTopicPartitions)
+                            .contains(new TopicPartition(TOPIC, partition));
+                }
+                for (int partition = 0; partition < expandedTopicPartitions; 
partition++) {
+                    assertThat(assignedTopicPartitions)
+                            .contains(new TopicPartition(expandedTopic, 
partition));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void 
testGlobalModeShrinkRestoreAndRuntimeRepartitionAfterDownscale() throws 
Throwable {
+        // Expect runtime repartition to rebalance 10 active splits over p=4 
as 3,3,2,2.
+        runGlobalModeShrinkRestoreAndRuntimeRepartitionScenario(
+                "downscale", 4, List.of(3, 3, 2, 2));
+    }
+
+    @Test
+    public void testGlobalModeShrinkRestoreAndRuntimeRepartitionAfterUpscale() 
throws Throwable {
+        // Expect runtime repartition to rebalance 10 active splits over p=8 
as 2,2,1,1,1,1,1,1.
+        runGlobalModeShrinkRestoreAndRuntimeRepartitionScenario(
+                "upscale", 8, List.of(2, 2, 1, 1, 1, 1, 1, 1));
+    }
+
+    private void runGlobalModeShrinkRestoreAndRuntimeRepartitionScenario(
+            String scenarioName, int restoredParallelism, List<Integer> 
expectedRepartitionCounts)
+            throws Throwable {
+        // Scenario summary:
+        // 1) Start in global mode with 13 splits over p=5 and verify 
near-balance.
+        // 2) Shrink metadata so active splits become 10 and verify no 
immediate rebalance.
+        // 3) Simulate runtime restore repartition for new parallelism and 
verify balanced counts.
+        // 4) Restore enumerator from checkpoint and regrow metadata to verify 
incremental
+        // assignment.
+        final int initialParallelism = 5;
+        final String removedTopic = TOPIC + "_removed3_" + scenarioName;
+        final String retainedTopic = TOPIC + "_retained4_" + scenarioName;
+
+        DynamicKafkaSourceTestHelper.createTopic(removedTopic, 
NUM_SPLITS_PER_CLUSTER, 1);
+        DynamicKafkaSourceTestHelper.produceToKafka(
+                removedTopic, NUM_SPLITS_PER_CLUSTER, NUM_RECORDS_PER_SPLIT);
+        DynamicKafkaSourceTestHelper.createTopic(retainedTopic, 
NUM_SPLITS_PER_CLUSTER + 1, 1);
+        DynamicKafkaSourceTestHelper.produceToKafka(
+                retainedTopic, NUM_SPLITS_PER_CLUSTER + 1, 
NUM_RECORDS_PER_SPLIT);
+
+        Map<String, ClusterMetadata> initialClusterMetadata = new HashMap<>();
+        initialClusterMetadata.putAll(
+                DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC, 
removedTopic));
+        initialClusterMetadata.putAll(
+                DynamicKafkaSourceTestHelper.getClusterMetadataMap(1, TOPIC, 
retainedTopic));
+        KafkaStream initialStream = new KafkaStream(TOPIC, 
initialClusterMetadata);
+
+        Map<String, ClusterMetadata> shrunkClusterMetadata = new HashMap<>();
+        
shrunkClusterMetadata.putAll(DynamicKafkaSourceTestHelper.getClusterMetadataMap(0,
 TOPIC));
+        shrunkClusterMetadata.putAll(
+                DynamicKafkaSourceTestHelper.getClusterMetadataMap(1, TOPIC, 
retainedTopic));
+        KafkaStream shrunkStream = new KafkaStream(TOPIC, 
shrunkClusterMetadata);
+
+        DynamicKafkaSourceEnumState shrunkCheckpoint;
+        MockKafkaMetadataService metadataService =
+                new 
MockKafkaMetadataService(Collections.singleton(initialStream));
+        try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+                        new MockSplitEnumeratorContext<>(initialParallelism);
+                DynamicKafkaSourceEnumerator enumerator =
+                        createEnumerator(
+                                context,
+                                metadataService,
+                                properties -> {
+                                    properties.setProperty(
+                                            
DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+                                            
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL
+                                                    .name()
+                                                    .toLowerCase());
+                                    properties.setProperty(
+                                            DynamicKafkaSourceOptions
+                                                    
.STREAM_METADATA_DISCOVERY_INTERVAL_MS
+                                                    .key(),
+                                            "1");
+                                })) {
+            enumerator.start();
+            context.runPeriodicCallable(0);
+            runAllOneTimeCallables(context);
+
+            for (int i = 0; i < initialParallelism; i++) {
+                mockRegisterReaderAndSendReaderStartupEvent(context, 
enumerator, i);
+            }
+
+            Map<String, Map<String, Integer>> expectedInitialPartitions = new 
HashMap<>();
+            Map<String, Integer> cluster0TopicPartitions = new HashMap<>();
+            cluster0TopicPartitions.put(TOPIC, NUM_SPLITS_PER_CLUSTER);
+            cluster0TopicPartitions.put(removedTopic, NUM_SPLITS_PER_CLUSTER);
+            expectedInitialPartitions.put(
+                    DynamicKafkaSourceTestHelper.getKafkaClusterId(0), 
cluster0TopicPartitions);
+            Map<String, Integer> cluster1TopicPartitions = new HashMap<>();
+            cluster1TopicPartitions.put(TOPIC, NUM_SPLITS_PER_CLUSTER);
+            cluster1TopicPartitions.put(retainedTopic, NUM_SPLITS_PER_CLUSTER 
+ 1);
+            expectedInitialPartitions.put(
+                    DynamicKafkaSourceTestHelper.getKafkaClusterId(1), 
cluster1TopicPartitions);
+            verifyExpectedTopicPartitions(
+                    context.getSplitsAssignmentSequence(), 
expectedInitialPartitions);
+
+            Map<Integer, Set<String>> fullAssignmentsBeforeShrink =
+                    getReaderAssignmentsBySplitId(
+                            context.getSplitsAssignmentSequence(), 
initialParallelism);
+            Map<Integer, Integer> countsBeforeShrink =
+                    getReaderSplitCounts(fullAssignmentsBeforeShrink, 
initialParallelism);
+            assertThat(
+                            Collections.max(countsBeforeShrink.values())
+                                    - 
Collections.min(countsBeforeShrink.values()))
+                    .as("initial global assignment should be near-balanced")
+                    .isLessThanOrEqualTo(1);
+
+            int assignmentsBeforeShrink = 
context.getSplitsAssignmentSequence().size();
+            
metadataService.setKafkaStreams(Collections.singleton(shrunkStream));
+            context.runPeriodicCallable(0);
+            runAllOneTimeCallables(context);
+            assertThat(context.getSplitsAssignmentSequence().size())
+                    .as("metadata shrink should not create new split 
assignments")
+                    .isEqualTo(assignmentsBeforeShrink);
+
+            shrunkCheckpoint = enumerator.snapshotState(-1);
+            Set<String> activeSplitIdsAfterShrink =
+                    
shrunkCheckpoint.getClusterEnumeratorStates().entrySet().stream()
+                            .flatMap(
+                                    entry ->
+                                            
entry.getValue().assignedSplits().stream()
+                                                    .map(
+                                                            split ->
+                                                                    
dynamicSplitId(
+                                                                            
entry.getKey(), split)))
+                            .collect(Collectors.toSet());
+            assertThat(activeSplitIdsAfterShrink)
+                    .as("shrink scenario keeps exactly 10 active splits")
+                    .hasSize(10);
+
+            Map<Integer, Set<String>> activeAssignmentsAfterShrink =
+                    retainOnlyActiveSplits(fullAssignmentsBeforeShrink, 
activeSplitIdsAfterShrink);
+            Map<Integer, Integer> countsAfterShrink =
+                    getReaderSplitCounts(activeAssignmentsAfterShrink, 
initialParallelism);
+
+            List<Integer> countsAfterRuntimeRepartition =
+                    repartitionReaderStateCountsForRestore(countsAfterShrink, 
restoredParallelism);
+            assertThat(countsAfterRuntimeRepartition)
+                    .as("runtime repartition on restore should match expected 
balanced counts")
+                    .containsExactlyElementsOf(expectedRepartitionCounts);
+            assertThat(
+                            Collections.max(countsAfterRuntimeRepartition)
+                                    - 
Collections.min(countsAfterRuntimeRepartition))
+                    .as("runtime repartition keeps split-count skew within 1")
+                    .isLessThanOrEqualTo(1);
+        }
+
+        MockKafkaMetadataService restoredMetadataService =
+                new 
MockKafkaMetadataService(Collections.singleton(shrunkStream));
+        Properties properties = new Properties();
+        
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
 "0");
+        properties.setProperty(
+                
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "1");
+        properties.setProperty(
+                DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+                
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL.name().toLowerCase());
+
+        try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> 
restoredContext =
+                        new MockSplitEnumeratorContext<>(restoredParallelism);
+                DynamicKafkaSourceEnumerator restoredEnumerator =
+                        new DynamicKafkaSourceEnumerator(
+                                new 
KafkaStreamSetSubscriber(Collections.singleton(TOPIC)),
+                                restoredMetadataService,
+                                restoredContext,
+                                OffsetsInitializer.earliest(),
+                                new NoStoppingOffsetsInitializer(),
+                                properties,
+                                Boundedness.CONTINUOUS_UNBOUNDED,
+                                shrunkCheckpoint,
+                                new TestKafkaEnumContextProxyFactory())) {
+            restoredEnumerator.start();
+            for (int i = 0; i < restoredParallelism; i++) {
+                mockRegisterReaderAndSendReaderStartupEvent(restoredContext, 
restoredEnumerator, i);
+            }
+
+            restoredContext.runPeriodicCallable(0);
+            runAllOneTimeCallables(restoredContext);
+            int assignmentsBeforeRegrowth = 
restoredContext.getSplitsAssignmentSequence().size();
+
+            
restoredMetadataService.setKafkaStreams(Collections.singleton(initialStream));
+            restoredContext.runPeriodicCallable(0);
+            runAllOneTimeCallables(restoredContext);
+
+            List<SplitsAssignment<DynamicKafkaSourceSplit>> 
incrementalAssignments =
+                    restoredContext
+                            .getSplitsAssignmentSequence()
+                            .subList(
+                                    assignmentsBeforeRegrowth,
+                                    
restoredContext.getSplitsAssignmentSequence().size());
+            assertThat(incrementalAssignments)
+                    .as("regrowth should assign only newly reintroduced 
splits")
+                    .isNotEmpty();
+            assertAssignmentsBalanced(incrementalAssignments, 
restoredParallelism);
+        }
+    }
+
     @Test
     public void testEnumeratorDoesNotAssignDuplicateSplitsInMetadataUpdate() 
throws Throwable {
         KafkaStream kafkaStreamWithOneCluster = 
DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC);
@@ -949,6 +1462,166 @@ public class DynamicKafkaSourceEnumeratorTest {
         return readerToSplits;
     }
 
+    private Map<Integer, Set<String>> getReaderAssignmentsBySplitId(
+            List<SplitsAssignment<DynamicKafkaSourceSplit>> 
splitsAssignmentSequence,
+            int numReaders) {
+        Map<Integer, Set<String>> readerToSplitIds = new HashMap<>();
+        for (int readerId = 0; readerId < numReaders; readerId++) {
+            readerToSplitIds.put(readerId, new HashSet<>());
+        }
+        for (SplitsAssignment<DynamicKafkaSourceSplit> split : 
splitsAssignmentSequence) {
+            for (Entry<Integer, List<DynamicKafkaSourceSplit>> assignments :
+                    split.assignment().entrySet()) {
+                readerToSplitIds
+                        .computeIfAbsent(assignments.getKey(), ignored -> new 
HashSet<>())
+                        .addAll(
+                                assignments.getValue().stream()
+                                        .map(DynamicKafkaSourceSplit::splitId)
+                                        .collect(Collectors.toSet()));
+            }
+        }
+        return readerToSplitIds;
+    }
+
+    private Map<Integer, Set<String>> retainOnlyActiveSplits(
+            Map<Integer, Set<String>> readerToSplitIds, Set<String> 
activeSplitIds) {
+        Map<Integer, Set<String>> filtered = new HashMap<>();
+        for (Entry<Integer, Set<String>> entry : readerToSplitIds.entrySet()) {
+            Set<String> activeForReader =
+                    entry.getValue().stream()
+                            .filter(activeSplitIds::contains)
+                            .collect(Collectors.toCollection(HashSet::new));
+            filtered.put(entry.getKey(), activeForReader);
+        }
+        return filtered;
+    }
+
+    private static String dynamicSplitId(String clusterId, KafkaPartitionSplit 
split) {
+        return clusterId + "-" + split.splitId();
+    }
+
+    private void verifyExpectedTopicPartitions(
+            List<SplitsAssignment<DynamicKafkaSourceSplit>> 
splitsAssignmentSequence,
+            Map<String, Map<String, Integer>> 
expectedPartitionsByClusterAndTopic) {
+        Map<String, Map<String, Set<Integer>>> 
observedPartitionsByClusterAndTopic =
+                new HashMap<>();
+        for (SplitsAssignment<DynamicKafkaSourceSplit> split : 
splitsAssignmentSequence) {
+            for (Entry<Integer, List<DynamicKafkaSourceSplit>> assignments :
+                    split.assignment().entrySet()) {
+                for (DynamicKafkaSourceSplit assignment : 
assignments.getValue()) {
+                    observedPartitionsByClusterAndTopic
+                            .computeIfAbsent(
+                                    assignment.getKafkaClusterId(), ignored -> 
new HashMap<>())
+                            .computeIfAbsent(
+                                    
assignment.getKafkaPartitionSplit().getTopic(),
+                                    ignored -> new HashSet<>())
+                            
.add(assignment.getKafkaPartitionSplit().getPartition());
+                }
+            }
+        }
+
+        for (Entry<String, Map<String, Integer>> clusterExpected :
+                expectedPartitionsByClusterAndTopic.entrySet()) {
+            assertThat(observedPartitionsByClusterAndTopic)
+                    .as("expected cluster must have assigned splits")
+                    .containsKey(clusterExpected.getKey());
+            for (Entry<String, Integer> topicExpected : 
clusterExpected.getValue().entrySet()) {
+                Set<Integer> observedTopicPartitions =
+                        observedPartitionsByClusterAndTopic
+                                .get(clusterExpected.getKey())
+                                .get(topicExpected.getKey());
+                assertThat(observedTopicPartitions)
+                        .as("expected topic must have assigned splits")
+                        .isNotNull();
+                
assertThat(observedTopicPartitions).hasSize(topicExpected.getValue());
+                for (int partition = 0; partition < topicExpected.getValue(); 
partition++) {
+                    assertThat(observedTopicPartitions).contains(partition);
+                }
+            }
+        }
+    }
+
+    private void assertAssignmentsBalanced(
+            List<SplitsAssignment<DynamicKafkaSourceSplit>> 
assignmentSequence, int numReaders) {
+        Map<Integer, Integer> assignedSplitCountByReader = new HashMap<>();
+        for (int readerId = 0; readerId < numReaders; readerId++) {
+            assignedSplitCountByReader.put(readerId, 0);
+        }
+        for (SplitsAssignment<DynamicKafkaSourceSplit> assignment : 
assignmentSequence) {
+            for (Entry<Integer, List<DynamicKafkaSourceSplit>> entry :
+                    assignment.assignment().entrySet()) {
+                assignedSplitCountByReader.merge(
+                        entry.getKey(), entry.getValue().size(), Integer::sum);
+            }
+        }
+
+        int minAssignedSplits = 
Collections.min(assignedSplitCountByReader.values());
+        int maxAssignedSplits = 
Collections.max(assignedSplitCountByReader.values());
+        assertThat(maxAssignedSplits - minAssignedSplits)
+                .as("global mode should keep per-reader split counts balanced")
+                .isLessThanOrEqualTo(1);
+    }
+
+    private Map<Integer, Integer> getReaderSplitCounts(
+            Map<Integer, Set<String>> assignmentsByReader, int numReaders) {
+        Map<Integer, Integer> splitCounts = new HashMap<>();
+        for (int readerId = 0; readerId < numReaders; readerId++) {
+            splitCounts.put(readerId, 0);
+        }
+        assignmentsByReader.forEach(
+                (readerId, splitIds) -> splitCounts.put(readerId, 
splitIds.size()));
+        return splitCounts;
+    }
+
+    private List<Integer> repartitionReaderStateCountsForRestore(
+            Map<Integer, Integer> splitCountsByOldSubtask, int 
restoredParallelism) {
+        final int oldParallelism = splitCountsByOldSubtask.size();
+        List<List<OperatorStateHandle>> previousState = new ArrayList<>();
+
+        long offsetSeed = 0L;
+        for (int subtask = 0; subtask < oldParallelism; subtask++) {
+            int splitCount = splitCountsByOldSubtask.getOrDefault(subtask, 0);
+            long[] offsets = new long[splitCount];
+            for (int i = 0; i < splitCount; i++) {
+                offsets[i] = offsetSeed++;
+            }
+
+            Map<String, OperatorStateHandle.StateMetaInfo> stateNameToMeta = 
new HashMap<>();
+            stateNameToMeta.put(
+                    SOURCE_READER_SPLIT_STATE_NAME,
+                    new OperatorStateHandle.StateMetaInfo(
+                            offsets, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+
+            previousState.add(
+                    Collections.singletonList(
+                            new OperatorStreamStateHandle(
+                                    stateNameToMeta,
+                                    new ByteStreamStateHandle(
+                                            "reader-state-" + subtask, new 
byte[0]))));
+        }
+
+        List<List<OperatorStateHandle>> repartitionedState =
+                RoundRobinOperatorStateRepartitioner.INSTANCE.repartitionState(
+                        previousState, oldParallelism, restoredParallelism);
+
+        List<Integer> splitCountsAfterRepartition = new 
ArrayList<>(repartitionedState.size());
+        for (List<OperatorStateHandle> subtaskStateHandles : 
repartitionedState) {
+            int splitCount = 0;
+            for (OperatorStateHandle stateHandle : subtaskStateHandles) {
+                splitCount +=
+                        
stateHandle.getStateNameToPartitionOffsets().entrySet().stream()
+                                .filter(
+                                        entry ->
+                                                
SOURCE_READER_SPLIT_STATE_NAME.equals(
+                                                        entry.getKey()))
+                                .mapToInt(entry -> 
entry.getValue().getOffsets().length)
+                                .sum();
+            }
+            splitCountsAfterRepartition.add(splitCount);
+        }
+        return splitCountsAfterRepartition;
+    }
+
     private List<TopicPartition> getFilteredTopicPartitions(
             DynamicKafkaSourceEnumState state, String topic, AssignmentStatus 
assignmentStatus) {
         return state.getClusterEnumeratorStates().values().stream()
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssignerTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssignerTest.java
new file mode 100644
index 00000000..d8dd6681
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssignerTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.dynamic.source.enumerator;
+
+import 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link GlobalSplitOwnerAssigner}. */
+class GlobalSplitOwnerAssignerTest {
+
+    @Test
+    void testRoundRobinAssignmentAcrossClustersWithMetadataChanges() {
+        GlobalSplitOwnerAssigner assigner = new GlobalSplitOwnerAssigner();
+
+        Set<String> activeSplitIds =
+                new HashSet<>(
+                        Arrays.asList(
+                                split("cluster-a", "topic-a", 0).splitId(),
+                                split("cluster-a", "topic-a", 1).splitId(),
+                                split("cluster-b", "topic-a", 0).splitId(),
+                                split("cluster-b", "topic-a", 1).splitId()));
+        assigner.onMetadataRefresh(activeSplitIds);
+
+        assertThat(assigner.assignSplitOwner(split("cluster-a", "topic-a", 
2).splitId(), 3))
+                .as("new split after 4 active splits should use 4 %% 3")
+                .isEqualTo(1);
+        assertThat(assigner.assignSplitOwner(split("cluster-b", "topic-a", 
2).splitId(), 3))
+                .as("next split should continue global round-robin order")
+                .isEqualTo(2);
+
+        Set<String> updatedSplitIds =
+                new HashSet<>(
+                        Arrays.asList(
+                                split("cluster-a", "topic-a", 0).splitId(),
+                                split("cluster-a", "topic-a", 1).splitId(),
+                                split("cluster-c", "topic-z", 0).splitId()));
+        assigner.onMetadataRefresh(updatedSplitIds);
+
+        assertThat(assigner.assignSplitOwner(split("cluster-c", "topic-z", 
1).splitId(), 3))
+                .as("metadata refresh should reseed round-robin by current 
active split count")
+                .isEqualTo(0);
+    }
+
+    @Test
+    void testFailureRecoveryPrefersSplitBackOwner() {
+        GlobalSplitOwnerAssigner assigner = new GlobalSplitOwnerAssigner();
+
+        Set<String> activeSplitIds =
+                new HashSet<>(
+                        Arrays.asList(
+                                split("cluster-a", "topic-a", 0).splitId(),
+                                split("cluster-a", "topic-a", 1).splitId(),
+                                split("cluster-b", "topic-a", 0).splitId(),
+                                split("cluster-b", "topic-a", 1).splitId()));
+        assigner.onMetadataRefresh(activeSplitIds);
+
+        DynamicKafkaSourceSplit returnedSplit = split("cluster-b", "topic-a", 
2);
+        assigner.onSplitsBack(Collections.singletonList(returnedSplit), 2);
+
+        assertThat(assigner.assignSplitOwner(returnedSplit.splitId(), 4))
+                .as("split-back should preserve owner affinity when subtask id 
is valid")
+                .isEqualTo(2);
+    }
+
+    @Test
+    void testRepartitionFallsBackToRoundRobinWhenSplitBackOwnerOutOfRange() {
+        GlobalSplitOwnerAssigner assigner = new GlobalSplitOwnerAssigner();
+
+        Set<String> activeSplitIds =
+                new HashSet<>(
+                        Arrays.asList(
+                                split("cluster-a", "topic-a", 0).splitId(),
+                                split("cluster-a", "topic-a", 1).splitId(),
+                                split("cluster-a", "topic-a", 2).splitId(),
+                                split("cluster-b", "topic-b", 0).splitId(),
+                                split("cluster-b", "topic-b", 1).splitId()));
+        assigner.onMetadataRefresh(activeSplitIds);
+
+        DynamicKafkaSourceSplit returnedSplit = split("cluster-b", "topic-b", 
2);
+        assigner.onSplitsBack(Collections.singletonList(returnedSplit), 4);
+
+        assertThat(assigner.assignSplitOwner(returnedSplit.splitId(), 3))
+                .as("after downscale, invalid split-back owner should fallback 
to RR")
+                .isEqualTo(0);
+
+        Set<String> restoredSplitIds = new HashSet<>(activeSplitIds);
+        restoredSplitIds.add(returnedSplit.splitId());
+        assigner.onMetadataRefresh(restoredSplitIds);
+
+        assertThat(assigner.assignSplitOwner(split("cluster-c", "topic-c", 
0).splitId(), 2))
+                .as("after restore/repartition, RR should seed from restored 
active split count")
+                .isEqualTo(0);
+    }
+
+    @Test
+    void testForwardLookingBalanceStrategy() {
+        GlobalSplitOwnerAssigner assigner = new GlobalSplitOwnerAssigner();
+        final int parallelism = 5;
+
+        // Step 1: verify baseline RR shape with 13 discovered splits over 5 
readers.
+        // This yields a near-balanced 3,3,3,2,2 distribution.
+        Map<Integer, List<String>> assignedBeforeShrink = 
initAssignments(parallelism);
+        Map<Integer, Integer> countsBeforeShrink = 
counts(assignedBeforeShrink);
+        assertThat(countsBeforeShrink).containsEntry(0, 3).containsEntry(1, 
3).containsEntry(2, 3);
+        assertThat(countsBeforeShrink).containsEntry(3, 2).containsEntry(4, 2);
+
+        // Step 2: emulate metadata shrink by removing 3 active splits.
+        // We intentionally do NOT "move" existing assignments, mirroring 
forward-looking behavior.
+        Set<String> activeAfterShrink =
+                assignedBeforeShrink.values().stream()
+                        .flatMap(List::stream)
+                        .collect(Collectors.toCollection(HashSet::new));
+        activeAfterShrink.remove(assignedBeforeShrink.get(2).get(2));
+        activeAfterShrink.remove(assignedBeforeShrink.get(3).get(1));
+        activeAfterShrink.remove(assignedBeforeShrink.get(4).get(1));
+        assigner.onMetadataRefresh(activeAfterShrink);
+
+        Map<Integer, List<String>> activeAssignmentsAfterShrink = new 
LinkedHashMap<>();
+        for (Map.Entry<Integer, List<String>> entry : 
assignedBeforeShrink.entrySet()) {
+            activeAssignmentsAfterShrink.put(
+                    entry.getKey(),
+                    entry.getValue().stream()
+                            .filter(activeAfterShrink::contains)
+                            .collect(Collectors.toList()));
+        }
+        Map<Integer, Integer> countsAfterShrink = 
counts(activeAssignmentsAfterShrink);
+        assertThat(countsAfterShrink)
+                .containsEntry(0, 3)
+                .containsEntry(1, 3)
+                .containsEntry(2, 2)
+                .containsEntry(3, 1)
+                .containsEntry(4, 1);
+
+        // Step 3: new assignments start from activeCount % parallelism after 
shrink.
+        // This proves the strategy reseeds from current active inventory 
rather than preserving an
+        // old "next owner" cursor from before shrink.
+        int ownerAfterShrink = 
assigner.assignSplitOwner("split-after-shrink-0", parallelism);
+        int nextOwnerAfterShrink = 
assigner.assignSplitOwner("split-after-shrink-1", parallelism);
+        assertThat(ownerAfterShrink).isEqualTo(0);
+        assertThat(nextOwnerAfterShrink).isEqualTo(1);
+
+        // If nothing were removed, next owner after 13 initial splits would 
have been 13%5=3.
+        assertThat(13 % parallelism).isEqualTo(3);
+    }
+
+    private static Map<Integer, List<String>> initAssignments(int parallelism) 
{
+        GlobalSplitOwnerAssigner assigner = new GlobalSplitOwnerAssigner();
+        Map<Integer, List<String>> assignments = new LinkedHashMap<>();
+        for (int i = 0; i < parallelism; i++) {
+            assignments.put(i, new ArrayList<>());
+        }
+        for (int i = 0; i < 13; i++) {
+            String splitId = "split-" + i;
+            int owner = assigner.assignSplitOwner(splitId, parallelism);
+            assignments.get(owner).add(splitId);
+        }
+        return assignments;
+    }
+
+    private static Map<Integer, Integer> counts(Map<Integer, List<String>> 
assignments) {
+        Map<Integer, Integer> counts = new LinkedHashMap<>();
+        for (Map.Entry<Integer, List<String>> entry : assignments.entrySet()) {
+            counts.put(entry.getKey(), entry.getValue().size());
+        }
+        return counts;
+    }
+
+    private static DynamicKafkaSourceSplit split(String clusterId, String 
topic, int partition) {
+        return new DynamicKafkaSourceSplit(
+                clusterId, new KafkaPartitionSplit(new TopicPartition(topic, 
partition), 0L));
+    }
+}

Reply via email to