This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new da1a90ff [FLINK-39012]Add global kafka enumerator support (#223)
da1a90ff is described below
commit da1a90ff3a8fd09f30b49015e9a35c8c00e2c8e1
Author: lnbest0707 <[email protected]>
AuthorDate: Wed Feb 11 10:04:28 2026 -0800
[FLINK-39012]Add global kafka enumerator support (#223)
Background
----------
DynamicKafkaSource currently composes one KafkaSourceEnumerator per Kafka
cluster.
In default (per_cluster) mode, each sub-enumerator assigns partition
splits independently using the
default KafkaSource owner function (topic-hash + partition). That gives
per-cluster
balance, but can produce global skew when multiple clusters are active
because each
cluster balances against the full reader set in isolation.
This change introduces an optional `GLOBAL` assignment mode while keeping
`PER_CLUSTER` as the
default for backward compatibility.
Checkpoint model (what is stored where)
---------------------------------------
JobManager / Enumerator checkpoint state:
- DynamicKafkaSourceEnumState:
- latest Kafka stream metadata (kafkaStreams), i.e. discovered
streams/clusters/topics
and cluster metadata used to rebuild sub-enumerators.
- per-cluster KafkaSourceEnumState (clusterEnumeratorStates).
- KafkaSourceEnumState stores:
- split inventory with assignment status (ASSIGNED / UNASSIGNED) as
SplitAndAssignmentStatus(KafkaPartitionSplit, status).
- initialDiscoveryFinished flag.
- (even before this change) **enumerator checkpoint does NOT persist
split->reader owner mapping**.
TaskManager / Reader checkpoint state:
- DynamicKafkaSourceReader snapshots List<DynamicKafkaSourceSplit> from
all sub-readers.
- Each KafkaPartitionSplit snapshot carries the current consumed offset
as split start
(via KafkaPartitionSplitState -> KafkaPartitionSplit), so record
progress is restored
exactly.
- Pending splits (before metadata activation) are also preserved.
- **Reader knows which splits to consume from the checkpoint**
Global mode design
------------------
New source option:
- stream-enumerator-mode = `per-cluster` | `global`
- default remains per-cluster.
Global mode behavior:
- A single `GlobalSplitOwnerAssigner` is shared by all sub-enumerators.
- For each cluster sub-enumerator, DynamicKafkaSourceEnumerator injects a
`SplitOwnerSelector` callback into KafkaSourceEnumerator.
- KafkaSourceEnumerator still owns discovery + pending assignment
mechanics, but owner
choice is delegated to the global selector.
Global owner selection:
- Split IDs are normalized as "<clusterId>-<partitionSplitId>" to make
ownership global.
- On new split assignment, owner is chosen by global round-robin cursor
derived from
currently known active split count (size % parallelism), then split
becomes known.
- On `addSplitsBack`, returned splits record a preferred owner hint; if
still valid under
current parallelism it is honored once, otherwise fallback to
round-robin.
Interaction with sub-enumerators
--------------------------------
No sub-enumerator lifecycle changes are made:
- Sub-enumerators still discover partitions asynchronously and build
pendingPartitionSplitAssignment.
- The only behavioral change is owner computation (global selector vs
topic name hash).
- Existing assignPendingPartitionSplits/addReader/addSplitsBack paths are
reused.
- This preserves KafkaSourceEnumerator invariants while enabling
cross-cluster balance.
Recovery and rescaling semantics
--------------------------------
1) Normal checkpoint recovery (same parallelism)
- Readers restore split state (with offsets) from operator state.
- Enumerators restore split inventory/status from
DynamicKafkaSourceEnumState.
- Because owner IDs are not checkpointed, enumerators do not conflict
with restored
reader ownership; they only assign truly unassigned/new/splits-back
work.
2) Recovery after metadata changes
- Enumerator snapshots current sub-enumerator state, rebuilds only active
clusters/topics,
filters stale topics, and restores from filtered state.
- Global strategy reseeds active split IDs from restored state, so new
assignments remain
globally balanced without duplicating already-restored splits.
- As **it only assign new/pending split, it would not require a full
rebalance on all splits**.
3) Rescale / autoscaling (parallelism change)
- Flink repartitions reader operator state to new subtasks before readers
start; split
ownership may move across reader indices as part of standard
operator-state restore.
- Enumerator state remains valid because it tracks split status
(ASSGINED/UNASSIGNED), not reader IDs.
- New assignments after restore use current parallelism via the global
selector.
- Splits-back preferred owner is applied only if the subtask id still
exists; otherwise
fallback is deterministic round-robin under new parallelism.
Determinism: what changed and why it matters
--------------------------------------------
Default(per_cluster) owner mapping is a pure function of
topic/partition/parallelism, so a split’s
owner is highly predictable.
Global mode is intentionally not fully deterministic across runs, it is
**forward looking global balancing**:
- Exact owner can depend on discovery/event ordering across clusters and
metadata updates.
- Async discovery timing and rescale/failover timing can alter assignment
order.
- **Splits increase would not trigger assignment re-distribution and it
remains global balancing.**
- **Splits decrease would not trigger assignment re-distribution and the
global balancing is no longer guaranteed. The "holes" of the removed splits
remains until the next repartition, e.g. by parallelism change from
auto-scaling.**
What remains guaranteed:
- No duplicate assignment of active splits.
- Correct recovery from checkpoints.
- Global balancing objective for newly assigned work (observed skew
bounded in tests).
- Optional split-back affinity when valid.
Why this works:
- Data correctness is anchored by reader split state (offset progress) +
enumerator split
inventory/status.
- Though, we no longer have the fully deterministic assignment, stable
owner identity is not required for correctness; it is a scheduling choice.
Trade-off:
- We trade strict owner predictability for better cross-cluster balancing
and lower global
skew in multi-cluster dynamic deployments.
- We don't guarantee deterministic global assignment order to avoid full
rebalancing after stream metadata change.
Tests
-----
- Added/updated unit/integration coverage for:
- global balancing across clusters,
- cluster/topic expansion handling,
- cluster/topic removal handling
- recovery + repartition behavior under global mode,
- global owner assigner behavior (round-robin, split-back affinity,
out-of-range fallback).
---
.../docs/connectors/datastream/dynamic-kafka.md | 65 +-
.../docs/connectors/datastream/dynamic-kafka.md | 51 ++
.../kafka/dynamic/source/DynamicKafkaSource.java | 8 +-
.../dynamic/source/DynamicKafkaSourceBuilder.java | 15 +
.../dynamic/source/DynamicKafkaSourceOptions.java | 53 +-
.../enumerator/DynamicKafkaSourceEnumerator.java | 116 +++-
.../enumerator/GlobalSplitOwnerAssigner.java | 63 ++
.../source/enumerator/KafkaSourceEnumerator.java | 58 +-
.../kafka/table/DynamicKafkaTableFactory.java | 8 +
.../dynamic/source/DynamicKafkaSourceITTest.java | 94 +++
.../DynamicKafkaSourceEnumeratorTest.java | 673 +++++++++++++++++++++
.../enumerator/GlobalSplitOwnerAssignerTest.java | 203 +++++++
12 files changed, 1389 insertions(+), 18 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
index 2dc9d8ad..f5b2737f 100644
--- a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
@@ -60,14 +60,15 @@ corresponding to "input-stream".
```java
DynamicKafkaSource<String> source = DynamicKafkaSource.<String>builder()
- .setKafkaMetadataService(new MyKafkaMetadataService())
- .setStreamIds(Collections.singleton("input-stream"))
- .setStartingOffsets(OffsetsInitializer.earliest())
-
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
- .setProperties(properties)
- .build();
-
- env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic
Kafka Source");
+ .setKafkaMetadataService(new MyKafkaMetadataService())
+ .setStreamIds(Collections.singleton("input-stream"))
+ .setEnumeratorMode(DynamicKafkaSourceOptions.EnumeratorMode.PER_CLUSTER)
+ .setStartingOffsets(OffsetsInitializer.earliest())
+
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ .setProperties(properties)
+ .build();
+
+env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic Kafka
Source");
```
{{< /tab >}}
{{< tab "Python" >}}
@@ -169,6 +170,47 @@ The Dynamic Kafka Source provides 2 ways of subscribing to
Kafka stream(s).
{{< /tab >}}
{{< /tabs >}}
+### Split 分配模式
+
+Dynamic Kafka Source 支持两种 split 分配模式:
+
+* `per_cluster`(默认):在每个 Kafka 集群内独立进行 split 分配。
+* `global`:在所有已发现的 Kafka 集群范围内统一做全局负载均衡分配。
+
+你可以通过 builder API 或 source properties 来配置该模式。
+
+新发现 split 的 owner 计算方式如下:
+
+* `per_cluster`:使用 `KafkaSourceEnumerator` 相同的 owner 逻辑。
+ 对于 topic 分区 `(topic, partition)`,令 `numReaders = P`:
+ `startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % P`,
+ `owner = (startIndex + partition) % P`。
+* `global`:在所有集群共享一个全局 owner 游标:
+ `owner = knownActiveSplitIds.size() % numReaders`,
+ 然后将该 split id 加入 `knownActiveSplitIds`。
+ (当 split 通过 `addSplitsBack` 返回时,若原 owner 仍然有效,则优先复用该 owner。)
+
+在 `global` 模式下,均衡策略是**前向增量(forward-only)**的:新发现 split 会尽量保证后续分配均衡,
+但不会仅为重平衡主动迁移已分配且仍在消费的 active split。
+
+如果因为缩容/移除导致 global 分配出现倾斜,enumerator 不会自行重排已在运行的 split。
+如需对已有 ownership 做重平衡,可通过并行度变化后的恢复流程(例如 savepoint/checkpoint + rescale restore),
+让 Flink Runtime 对 source reader 的 operator state 进行重分区。
+
+{{< tabs "DynamicKafkaSourceEnumeratorMode" >}}
+{{< tab "Java" >}}
+```java
+DynamicKafkaSource<String> source =
+ DynamicKafkaSource.<String>builder()
+ .setKafkaMetadataService(new MyKafkaMetadataService())
+ .setStreamIds(Set.of("input-stream"))
+ .setEnumeratorMode(DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL)
+
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
### Kafka Metadata Service
An interface is provided to resolve the logical Kafka stream(s) into the
corresponding physical
@@ -210,6 +252,13 @@ There are configuration options in
DynamicKafkaSourceOptions that can be configu
<td>Integer</td>
<td>The number of consecutive failures before letting the exception from
Kafka metadata service discovery trigger jobmanager failure and global
failover. The default is one to at least catch startup failures.</td>
</tr>
+ <tr>
+ <td><h5>stream-enumerator-mode</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">per_cluster</td>
+ <td>String</td>
+ <td>Dynamic Kafka split 分配所使用的 Enumerator 实现。支持
<code>per_cluster</code>(集群内独立分配)和 <code>global</code>(跨集群全局均衡分配)。</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/docs/connectors/datastream/dynamic-kafka.md
b/docs/content/docs/connectors/datastream/dynamic-kafka.md
index 2d5c73f2..510a4576 100644
--- a/docs/content/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content/docs/connectors/datastream/dynamic-kafka.md
@@ -62,6 +62,7 @@ corresponding to "input-stream".
DynamicKafkaSource<String> source = DynamicKafkaSource.<String>builder()
.setKafkaMetadataService(new MyKafkaMetadataService())
.setStreamIds(Collections.singleton("input-stream"))
+ .setEnumeratorMode(DynamicKafkaSourceOptions.EnumeratorMode.PER_CLUSTER)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.setProperties(properties)
@@ -143,6 +144,49 @@ DynamicKafkaSource<String> source =
{{< /tab >}}
{{< /tabs >}}
+### Split Assignment Mode
+
+Dynamic Kafka Source supports two split-assignment modes:
+
+* `per_cluster` (default): assigns splits within each Kafka cluster
independently.
+* `global`: assigns splits across all discovered clusters using one global
balancing strategy.
+
+You can configure the mode either with the builder API or with source
properties.
+
+How next owner is chosen for a newly discovered split:
+
+* `per_cluster`: uses the same owner logic as `KafkaSourceEnumerator`.
+ For topic partition `(topic, partition)`, with `numReaders = P`:
+ `startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % P`,
+ `owner = (startIndex + partition) % P`.
+* `global`: uses one global owner cursor across all clusters:
+ `owner = knownActiveSplitIds.size() % numReaders`,
+ then adds the split id into `knownActiveSplitIds`.
+ (When a split is returned via `addSplitsBack`, the preferred previous owner
is reused when valid.)
+
+In `global` mode, balancing is **forward-looking**: newly discovered splits
are assigned to keep
+future distribution balanced, while already assigned active splits are not
proactively migrated only
+for rebalancing.
+
+If global assignment becomes skewed due to shrink/removal, the enumerator does
not rebalance already
+active splits by itself. To rebalance existing ownership, use a restore with
parallelism change
+(for example, savepoint/checkpoint restore after rescale), so Flink runtime
repartitions source
+reader operator state.
+
+{{< tabs "DynamicKafkaSourceEnumeratorMode" >}}
+{{< tab "Java" >}}
+```java
+DynamicKafkaSource<String> source =
+ DynamicKafkaSource.<String>builder()
+ .setKafkaMetadataService(new MyKafkaMetadataService())
+ .setStreamIds(Set.of("input-stream"))
+ .setEnumeratorMode(DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL)
+
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
### Kafka Stream Subscription
The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
* A set of Kafka stream ids. For example:
@@ -214,6 +258,13 @@ There are configuration options in
DynamicKafkaSourceOptions that can be configu
<td>Integer</td>
<td>The number of consecutive failures before letting the exception from
Kafka metadata service discovery trigger jobmanager failure and global
failover. The default is one to at least catch startup failures.</td>
</tr>
+ <tr>
+ <td><h5>stream-enumerator-mode</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">per_cluster</td>
+ <td>String</td>
+ <td>Enumerator implementation for dynamic Kafka split assignment.
Supported values are <code>per_cluster</code> (cluster-local assignment) and
<code>global</code> (globally balanced assignment across clusters).</td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java
index 9a93d7b8..18636c44 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java
@@ -138,10 +138,10 @@ public class DynamicKafkaSource<T>
}
/**
- * Create the {@link DynamicKafkaSourceEnumerator}.
+ * Create the configured split enumerator implementation.
*
* @param enumContext The {@link SplitEnumeratorContext context} for the
split enumerator.
- * @return the {@link DynamicKafkaSourceEnumerator}.
+ * @return the split enumerator.
*/
@Internal
@Override
@@ -159,12 +159,12 @@ public class DynamicKafkaSource<T>
}
/**
- * Restore the {@link DynamicKafkaSourceEnumerator}.
+ * Restore the configured split enumerator implementation.
*
* @param enumContext The {@link SplitEnumeratorContext context} for the
restored split
* enumerator.
* @param checkpoint The checkpoint to restore the SplitEnumerator from.
- * @return the {@link DynamicKafkaSourceEnumerator}.
+ * @return the split enumerator.
*/
@Internal
@Override
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
index 8e814afc..9b7c31ba 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
@@ -37,6 +37,7 @@ import
org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
@@ -180,6 +181,20 @@ public class DynamicKafkaSourceBuilder<T> {
return this;
}
+ /**
+ * Set the enumerator mode used for split assignment.
+ *
+ * @param enumeratorMode split assignment mode.
+ * @return the builder.
+ */
+ public DynamicKafkaSourceBuilder<T> setEnumeratorMode(
+ DynamicKafkaSourceOptions.EnumeratorMode enumeratorMode) {
+ Preconditions.checkNotNull(enumeratorMode);
+ return setProperty(
+ DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+ enumeratorMode.name().toLowerCase(Locale.ROOT));
+ }
+
/**
* Set the property for {@link CommonClientConfigs#GROUP_ID_CONFIG}. This
will be applied to all
* clusters.
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
index bdecaf39..56c6d201 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import java.util.Locale;
import java.util.Properties;
import java.util.function.Function;
@@ -34,6 +35,16 @@ public class DynamicKafkaSourceOptions {
private DynamicKafkaSourceOptions() {}
+ /**
+ * Enumerator mode determines how discovered Kafka splits are assigned to
source readers:
+ * cluster-local assignment (per-cluster behavior) or globally balanced
assignment across
+ * clusters.
+ */
+ public enum EnumeratorMode {
+ PER_CLUSTER,
+ GLOBAL
+ }
+
public static final ConfigOption<Long>
STREAM_METADATA_DISCOVERY_INTERVAL_MS =
ConfigOptions.key("stream-metadata-discovery-interval-ms")
.longType()
@@ -51,10 +62,50 @@ public class DynamicKafkaSourceOptions {
+ "trigger jobmanager failure and global
failover. The default is one to at least catch startup "
+ "failures.");
+ public static final ConfigOption<String> STREAM_ENUMERATOR_MODE =
+ ConfigOptions.key("stream-enumerator-mode")
+ .stringType()
+
.defaultValue(EnumeratorMode.PER_CLUSTER.name().toLowerCase(Locale.ROOT))
+ .withDescription(
+ "Enumerator implementation for dynamic Kafka split
assignment. "
+ + "'per_cluster' keeps per-cluster
assignment behavior, while "
+ + "'global' enables global load-balanced
assignment across clusters.");
+
@Internal
public static <T> T getOption(
Properties props, ConfigOption<?> configOption, Function<String,
T> parser) {
String value = props.getProperty(configOption.key());
- return (T) (value == null ? configOption.defaultValue() :
parser.apply(value));
+ if (value != null) {
+ return parser.apply(value);
+ }
+
+ Object defaultValue = configOption.defaultValue();
+ return defaultValue == null ? null :
parser.apply(String.valueOf(defaultValue));
+ }
+
+ @Internal
+ public static EnumeratorMode getEnumeratorMode(Properties props) {
+ return getOption(
+ props,
+ STREAM_ENUMERATOR_MODE,
+ value -> {
+ final String normalizedValue =
+ value.trim().toUpperCase(Locale.ROOT).replace('-',
'_');
+ try {
+ return EnumeratorMode.valueOf(normalizedValue);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid %s='%s'. Supported values
are: %s.",
+ STREAM_ENUMERATOR_MODE.key(),
+ value,
+
EnumeratorMode.PER_CLUSTER.name().toLowerCase(Locale.ROOT)
+ + ", "
+ + EnumeratorMode.GLOBAL
+ .name()
+
.toLowerCase(Locale.ROOT)),
+ e);
+ }
+ });
}
}
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
index c118b27f..73c0a143 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
@@ -77,6 +77,7 @@ public class DynamicKafkaSourceEnumerator
// The mapping that the split enumerator context needs to be able to
forward certain requests.
private final Map<String, StoppableKafkaEnumContextProxy>
clusterEnumContextMap;
+ private final SplitAssignmentStrategy splitAssignmentStrategy;
private final KafkaStreamSubscriber kafkaStreamSubscriber;
private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
private final KafkaMetadataService kafkaMetadataService;
@@ -155,6 +156,7 @@ public class DynamicKafkaSourceEnumerator
this.kafkaMetadataService = kafkaMetadataService;
this.stoppableKafkaEnumContextProxyFactory =
stoppableKafkaEnumContextProxyFactory;
+ this.splitAssignmentStrategy =
createSplitAssignmentStrategy(properties);
// handle checkpoint state and rebuild contexts
this.clusterEnumeratorMap = new HashMap<>();
@@ -181,6 +183,7 @@ public class DynamicKafkaSourceEnumerator
}
this.latestClusterTopicsMap = new HashMap<>();
+ Set<String> activeSplitIds = new HashSet<>();
for (Entry<String, KafkaSourceEnumState> clusterEnumState :
dynamicKafkaSourceEnumState.getClusterEnumeratorStates().entrySet()) {
this.latestClusterTopicsMap.put(
@@ -188,6 +191,15 @@ public class DynamicKafkaSourceEnumerator
clusterEnumState.getValue().assignedSplits().stream()
.map(KafkaPartitionSplit::getTopic)
.collect(Collectors.toSet()));
+ clusterEnumState
+ .getValue()
+ .splits()
+ .forEach(
+ splitStatus ->
+ activeSplitIds.add(
+ toDynamicSplitId(
+ clusterEnumState.getKey(),
+ splitStatus.split())));
createEnumeratorWithAssignedTopicPartitions(
clusterEnumState.getKey(),
@@ -197,6 +209,7 @@ public class DynamicKafkaSourceEnumerator
clusterStartingOffsets.get(clusterEnumState.getKey()),
clusterStoppingOffsets.get(clusterEnumState.getKey()));
}
+ splitAssignmentStrategy.onMetadataRefresh(activeSplitIds);
}
/**
@@ -303,6 +316,7 @@ public class DynamicKafkaSourceEnumerator
sendMetadataUpdateEventToAvailableReaders();
// create enumerators
+ Set<String> activeSplitIds = new HashSet<>();
for (Entry<String, Set<String>> activeClusterTopics :
latestClusterTopicsMap.entrySet()) {
KafkaSourceEnumState kafkaSourceEnumState =
dynamicKafkaSourceEnumState
@@ -318,6 +332,12 @@ public class DynamicKafkaSourceEnumerator
kafkaSourceEnumState.splits().stream()
.filter(tp ->
activeTopics.contains(tp.split().getTopic()))
.collect(Collectors.toSet());
+ partitions.forEach(
+ splitStatus ->
+ activeSplitIds.add(
+ toDynamicSplitId(
+ activeClusterTopics.getKey(),
+ splitStatus.split())));
newKafkaSourceEnumState =
new KafkaSourceEnumState(
@@ -337,6 +357,7 @@ public class DynamicKafkaSourceEnumerator
clusterStoppingOffsets.get(activeClusterTopics.getKey()));
}
+ splitAssignmentStrategy.onMetadataRefresh(activeSplitIds);
startAllEnumerators();
}
@@ -407,6 +428,10 @@ public class DynamicKafkaSourceEnumerator
kafkaClusterId,
kafkaMetadataService,
signalNoMoreSplitsCallback);
+ KafkaSourceEnumerator.SplitOwnerSelector splitOwnerSelector =
+
splitAssignmentStrategy.createSplitOwnerSelector(kafkaClusterId);
+ SplitEnumeratorContext<KafkaPartitionSplit> assignmentContext =
+
splitAssignmentStrategy.createAssignmentContext(kafkaClusterId, context);
Properties consumerProps = new Properties();
KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps);
@@ -425,9 +450,10 @@ public class DynamicKafkaSourceEnumerator
effectiveStartingOffsetsInitializer,
effectiveStoppingOffsetsInitializer,
consumerProps,
- context,
+ assignmentContext,
boundedness,
- kafkaSourceEnumState);
+ kafkaSourceEnumState,
+ splitOwnerSelector);
clusterEnumContextMap.put(kafkaClusterId, context);
clusterEnumeratorMap.put(kafkaClusterId, enumerator);
@@ -489,6 +515,8 @@ public class DynamicKafkaSourceEnumerator
@Override
public void addSplitsBack(List<DynamicKafkaSourceSplit> splits, int
subtaskId) {
logger.debug("Adding splits back for {}", subtaskId);
+ splitAssignmentStrategy.onSplitsBack(splits, subtaskId);
+
// separate splits by cluster
Map<String, List<KafkaPartitionSplit>> kafkaPartitionSplits = new
HashMap<>();
for (DynamicKafkaSourceSplit split : splits) {
@@ -518,6 +546,8 @@ public class DynamicKafkaSourceEnumerator
@Override
public void addReader(int subtaskId) {
logger.debug("Adding reader {}", subtaskId);
+ splitAssignmentStrategy.onReaderAdded(subtaskId);
+
// assign pending splits from the sub enumerator
clusterEnumeratorMap.forEach(
(cluster, subEnumerator) ->
subEnumerator.addReader(subtaskId));
@@ -579,4 +609,86 @@ public class DynamicKafkaSourceEnumerator
throw new RuntimeException(e);
}
}
+
+ private SplitAssignmentStrategy createSplitAssignmentStrategy(Properties
properties) {
+ DynamicKafkaSourceOptions.EnumeratorMode enumeratorMode =
+ DynamicKafkaSourceOptions.getEnumeratorMode(properties);
+ logger.info("Using dynamic Kafka split enumerator mode: {}",
enumeratorMode);
+
+ switch (enumeratorMode) {
+ case GLOBAL:
+ return new GlobalSplitAssignmentStrategy();
+ case PER_CLUSTER:
+ default:
+ return new PerClusterSplitAssignmentStrategy();
+ }
+ }
+
+ private static String toDynamicSplitId(String kafkaClusterId,
KafkaPartitionSplit split) {
+ return kafkaClusterId + "-" + split.splitId();
+ }
+
+ private interface SplitAssignmentStrategy {
+ @Nullable
+ default KafkaSourceEnumerator.SplitOwnerSelector
createSplitOwnerSelector(
+ String kafkaClusterId) {
+ return null;
+ }
+
+ SplitEnumeratorContext<KafkaPartitionSplit> createAssignmentContext(
+ String kafkaClusterId, StoppableKafkaEnumContextProxy
clusterContext);
+
+ default void onReaderAdded(int subtaskId) {}
+
+ default void onSplitsBack(List<DynamicKafkaSourceSplit> splits, int
subtaskId) {}
+
+ default void onMetadataRefresh(Set<String> activeSplitIds) {}
+ }
+
+ private static class PerClusterSplitAssignmentStrategy implements
SplitAssignmentStrategy {
+ @Override
+ public SplitEnumeratorContext<KafkaPartitionSplit>
createAssignmentContext(
+ String kafkaClusterId, StoppableKafkaEnumContextProxy
clusterContext) {
+ return clusterContext;
+ }
+ }
+
+ private static class GlobalSplitAssignmentStrategy implements
SplitAssignmentStrategy {
+ private final GlobalSplitOwnerAssigner splitOwnerAssigner;
+
+ private GlobalSplitAssignmentStrategy() {
+ this.splitOwnerAssigner = new GlobalSplitOwnerAssigner();
+ }
+
+ @Override
+ public SplitEnumeratorContext<KafkaPartitionSplit>
createAssignmentContext(
+ String kafkaClusterId, StoppableKafkaEnumContextProxy
clusterContext) {
+ return clusterContext;
+ }
+
+ @Override
+ public KafkaSourceEnumerator.SplitOwnerSelector
createSplitOwnerSelector(
+ String kafkaClusterId) {
+ return (split, numReaders) -> assignSplitOwner(kafkaClusterId,
split, numReaders);
+ }
+
+ @Override
+ public void onReaderAdded(int subtaskId) {}
+
+ @Override
+ public void onSplitsBack(List<DynamicKafkaSourceSplit> splits, int
subtaskId) {
+ splitOwnerAssigner.onSplitsBack(splits, subtaskId);
+ }
+
+ @Override
+ public void onMetadataRefresh(Set<String> activeSplitIds) {
+ splitOwnerAssigner.onMetadataRefresh(activeSplitIds);
+ }
+
+ private int assignSplitOwner(
+ String kafkaClusterId, KafkaPartitionSplit split, int
numReaders) {
+ final String splitId = toDynamicSplitId(kafkaClusterId, split);
+ return splitOwnerAssigner.assignSplitOwner(splitId, numReaders);
+ }
+ }
}
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssigner.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssigner.java
new file mode 100644
index 00000000..aa93a9a1
--- /dev/null
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssigner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.dynamic.source.enumerator;
+
+import
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Tracks globally known dynamic split ids and computes reader ownership using
round-robin order.
+ */
+final class GlobalSplitOwnerAssigner {
+
+ private final Set<String> knownActiveSplitIds = new HashSet<>();
+ private final Map<String, Integer> preferredOwnerBySplitId = new
HashMap<>();
+
+ void onMetadataRefresh(Set<String> activeSplitIds) {
+ knownActiveSplitIds.clear();
+ knownActiveSplitIds.addAll(activeSplitIds);
+ preferredOwnerBySplitId.keySet().retainAll(activeSplitIds);
+ }
+
+ void onSplitsBack(List<DynamicKafkaSourceSplit> splits, int subtaskId) {
+ for (DynamicKafkaSourceSplit split : splits) {
+ knownActiveSplitIds.add(split.splitId());
+ preferredOwnerBySplitId.put(split.splitId(), subtaskId);
+ }
+ }
+
+ int assignSplitOwner(String splitId, int numReaders) {
+ Preconditions.checkArgument(numReaders > 0, "numReaders must be > 0");
+
+ Integer preferredOwner = preferredOwnerBySplitId.remove(splitId);
+ if (preferredOwner != null && preferredOwner >= 0 && preferredOwner <
numReaders) {
+ return preferredOwner;
+ }
+
+ int targetReader = Math.floorMod(knownActiveSplitIds.size(),
numReaders);
+ knownActiveSplitIds.add(splitId);
+ return targetReader;
+ }
+}
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 61c64e12..34523a5b 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -46,6 +46,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -59,6 +60,7 @@ import java.util.stream.Collectors;
import static
org.apache.flink.util.CollectionUtil.newLinkedHashMapWithExpectedSize;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* The enumerator class for Kafka source.
@@ -94,6 +96,17 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
public class KafkaSourceEnumerator
implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaSourceEnumerator.class);
+
+ /**
+ * Selects the target reader for a split while it is enqueued in pending
assignments.
+ *
+ * <p>The selector is invoked on the coordinator thread.
+ */
+ @FunctionalInterface
+ public interface SplitOwnerSelector {
+ int getSplitOwner(KafkaPartitionSplit split, int numReaders);
+ }
+
private final KafkaSubscriber subscriber;
private final OffsetsInitializer startingOffsetInitializer;
private final OffsetsInitializer stoppingOffsetInitializer;
@@ -118,6 +131,8 @@ public class KafkaSourceEnumerator
private final Map<Integer, Set<KafkaPartitionSplit>>
pendingPartitionSplitAssignment =
new HashMap<>();
+ private final SplitOwnerSelector splitOwnerSelector;
+
/** The consumer group id used for this KafkaSource. */
private final String consumerGroupId;
@@ -144,7 +159,8 @@ public class KafkaSourceEnumerator
properties,
context,
boundedness,
- new KafkaSourceEnumState(Collections.emptySet(), false));
+ new KafkaSourceEnumState(Collections.emptySet(), false),
+ null);
}
public KafkaSourceEnumerator(
@@ -155,6 +171,26 @@ public class KafkaSourceEnumerator
SplitEnumeratorContext<KafkaPartitionSplit> context,
Boundedness boundedness,
KafkaSourceEnumState kafkaSourceEnumState) {
+ this(
+ subscriber,
+ startingOffsetInitializer,
+ stoppingOffsetInitializer,
+ properties,
+ context,
+ boundedness,
+ kafkaSourceEnumState,
+ null);
+ }
+
+ public KafkaSourceEnumerator(
+ KafkaSubscriber subscriber,
+ OffsetsInitializer startingOffsetInitializer,
+ OffsetsInitializer stoppingOffsetInitializer,
+ Properties properties,
+ SplitEnumeratorContext<KafkaPartitionSplit> context,
+ Boundedness boundedness,
+ KafkaSourceEnumState kafkaSourceEnumState,
+ @Nullable SplitOwnerSelector splitOwnerSelector) {
this.subscriber = subscriber;
this.startingOffsetInitializer = startingOffsetInitializer;
this.stoppingOffsetInitializer = stoppingOffsetInitializer;
@@ -162,6 +198,11 @@ public class KafkaSourceEnumerator
this.properties = properties;
this.context = context;
this.boundedness = boundedness;
+ this.splitOwnerSelector =
+ splitOwnerSelector != null
+ ? splitOwnerSelector
+ : (split, numReaders) ->
+ getSplitOwner(split.getTopicPartition(),
numReaders);
this.partitionDiscoveryIntervalMs =
KafkaSourceOptions.getOption(
@@ -403,8 +444,19 @@ public class KafkaSourceEnumerator
private void addPartitionSplitChangeToPendingAssignments(
Collection<KafkaPartitionSplit> newPartitionSplits) {
int numReaders = context.currentParallelism();
- for (KafkaPartitionSplit split : newPartitionSplits) {
- int ownerReader = getSplitOwner(split.getTopicPartition(),
numReaders);
+ List<KafkaPartitionSplit> sortedSplits = new
ArrayList<>(newPartitionSplits);
+ sortedSplits.sort(
+ Comparator.comparing(
+ (KafkaPartitionSplit split) ->
split.getTopicPartition().topic())
+ .thenComparingInt(split ->
split.getTopicPartition().partition()));
+ for (KafkaPartitionSplit split : sortedSplits) {
+ int ownerReader = splitOwnerSelector.getSplitOwner(split,
numReaders);
+ checkState(
+ ownerReader >= 0 && ownerReader < numReaders,
+ "Invalid split owner %s for split %s with current
parallelism %s",
+ ownerReader,
+ split,
+ numReaders);
pendingPartitionSplitAssignment
.computeIfAbsent(ownerReader, r -> new HashSet<>())
.add(split);
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
index fea44744..99304838 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
@@ -122,6 +122,7 @@ public class DynamicKafkaTableFactory implements
DynamicTableSourceFactory {
options.add(SCAN_PARALLELISM);
options.add(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS);
options.add(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD);
+ options.add(DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE);
return options;
}
@@ -216,6 +217,13 @@ public class DynamicKafkaTableFactory implements
DynamicTableSourceFactory {
.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD
.key(),
Integer.toString(value)));
+ tableOptions
+ .getOptional(DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE)
+ .ifPresent(
+ value ->
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+ value));
}
private static Optional<DecodingFormat<DeserializationSchema<RowData>>>
getKeyDecodingFormat(
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
index a5107c90..c95f2f9a 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.kafka.dynamic.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
@@ -84,6 +85,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -213,6 +215,58 @@ class DynamicKafkaSourceITTest {
.collect(Collectors.toList()));
}
+ @Test
+ void testGlobalEnumeratorModeBalancesAssignments() throws Throwable {
+ // This verifies the global mode wiring from DynamicKafkaSource
builder -> enumerator.
+ // In global mode, split ownership should be balanced across all
readers (not per
+ // cluster).
+ final int numSubtasks = 4;
+ Properties properties = new Properties();
+
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
"0");
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "0");
+ properties.setProperty(
+ DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL.name().toLowerCase());
+
+ MockKafkaMetadataService metadataService =
+ new MockKafkaMetadataService(
+ Collections.singleton(
+
DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC)));
+
+ DynamicKafkaSource<Integer> source =
+ DynamicKafkaSource.<Integer>builder()
+ .setStreamIds(
+ metadataService.getAllStreams().stream()
+ .map(KafkaStream::getStreamId)
+ .collect(Collectors.toSet()))
+ .setKafkaMetadataService(metadataService)
+ .setDeserializer(
+ KafkaRecordDeserializationSchema.valueOnly(
+ IntegerDeserializer.class))
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setProperties(properties)
+ .build();
+
+ try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+ new MockSplitEnumeratorContext<>(numSubtasks);
+ SplitEnumerator<DynamicKafkaSourceSplit,
DynamicKafkaSourceEnumState>
+ splitEnumerator =
source.createEnumerator(context)) {
+ DynamicKafkaSourceEnumerator enumerator =
+ (DynamicKafkaSourceEnumerator) splitEnumerator;
+ enumerator.start();
+
+ for (int readerId = 0; readerId < numSubtasks; readerId++) {
+ registerReader(context, enumerator, readerId);
+ }
+ runAllOneTimeCallables(context);
+
+ verifyAllSplitsAssignedOnce(
+ context.getSplitsAssignmentSequence(),
metadataService.getAllStreams());
+
assertAssignmentsBalanced(context.getSplitsAssignmentSequence(), numSubtasks);
+ }
+ }
+
@Test
void testSingleClusterTopicMetadataService() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -1096,6 +1150,46 @@ class DynamicKafkaSourceITTest {
}
}
+ private void verifyAllSplitsAssignedOnce(
+ List<SplitsAssignment<DynamicKafkaSourceSplit>> assignments,
+ Set<KafkaStream> kafkaStreams) {
+ Map<String, Integer> assignmentFrequency = new HashMap<>();
+ for (SplitsAssignment<DynamicKafkaSourceSplit> step : assignments)
{
+ for (List<DynamicKafkaSourceSplit> splits :
step.assignment().values()) {
+ for (DynamicKafkaSourceSplit split : splits) {
+ assignmentFrequency.merge(split.splitId(), 1,
Integer::sum);
+ }
+ }
+ }
+
+ int expectedSplits =
+ kafkaStreams.stream()
+ .flatMap(stream ->
stream.getClusterMetadataMap().entrySet().stream())
+ .mapToInt(entry ->
entry.getValue().getTopics().size() * NUM_PARTITIONS)
+ .sum();
+ assertThat(assignmentFrequency).hasSize(expectedSplits);
+ assertThat(assignmentFrequency.values()).allMatch(count -> count
== 1);
+ }
+
+ private void assertAssignmentsBalanced(
+ List<SplitsAssignment<DynamicKafkaSourceSplit>> assignments,
int numReaders) {
+ Map<Integer, Integer> assignedSplitCountByReader = new HashMap<>();
+ for (int readerId = 0; readerId < numReaders; readerId++) {
+ assignedSplitCountByReader.put(readerId, 0);
+ }
+ for (SplitsAssignment<DynamicKafkaSourceSplit> assignment :
assignments) {
+ for (Map.Entry<Integer, List<DynamicKafkaSourceSplit>> entry :
+ assignment.assignment().entrySet()) {
+ assignedSplitCountByReader.merge(
+ entry.getKey(), entry.getValue().size(),
Integer::sum);
+ }
+ }
+
+ int minAssignedSplits =
Collections.min(assignedSplitCountByReader.values());
+ int maxAssignedSplits =
Collections.max(assignedSplitCountByReader.values());
+ assertThat(maxAssignedSplits -
minAssignedSplits).isLessThanOrEqualTo(1);
+ }
+
private Set<KafkaStream> getKafkaStreams(
String kafkaClusterId, Properties properties,
Collection<String> topics) {
return topics.stream()
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
index f974b6ff..21eb4399 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
@@ -33,10 +33,16 @@ import
org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.Kaf
import
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.enumerator.AssignmentStatus;
+import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.testutils.MockKafkaMetadataService;
import org.apache.flink.mock.Whitebox;
+import
org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import
org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
import com.google.common.collect.ImmutableSet;
@@ -47,6 +53,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -72,6 +79,7 @@ public class DynamicKafkaSourceEnumeratorTest {
private static final String TOPIC = "DynamicKafkaSourceEnumeratorTest";
private static final int NUM_SPLITS_PER_CLUSTER = 3;
private static final int NUM_RECORDS_PER_SPLIT = 5;
+ private static final String SOURCE_READER_SPLIT_STATE_NAME =
"source-reader-splits";
@BeforeAll
public static void beforeAll() throws Throwable {
@@ -688,6 +696,511 @@ public class DynamicKafkaSourceEnumeratorTest {
}
}
+ @Test
+ public void testGlobalEnumeratorModeBalancesSplitsAcrossClusters() throws
Throwable {
+ final int numSubtasks = 2;
+ try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+ new MockSplitEnumeratorContext<>(numSubtasks);
+ DynamicKafkaSourceEnumerator enumerator =
+ createEnumerator(
+ context,
+ properties ->
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE
+ .key(),
+
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL
+ .name()
+ .toLowerCase()))) {
+ enumerator.start();
+
+ for (int i = 0; i < numSubtasks; i++) {
+ mockRegisterReaderAndSendReaderStartupEvent(context,
enumerator, i);
+ }
+
+ runAllOneTimeCallables(context);
+
+ verifyAllSplitsHaveBeenAssigned(
+ context.getSplitsAssignmentSequence(),
+ DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC));
+
+ Map<Integer, Integer> assignedSplitCountByReader = new HashMap<>();
+ for (int readerId = 0; readerId < numSubtasks; readerId++) {
+ assignedSplitCountByReader.put(readerId, 0);
+ }
+ for (SplitsAssignment<DynamicKafkaSourceSplit> assignment :
+ context.getSplitsAssignmentSequence()) {
+ for (Entry<Integer, List<DynamicKafkaSourceSplit>> entry :
+ assignment.assignment().entrySet()) {
+ assignedSplitCountByReader.merge(
+ entry.getKey(), entry.getValue().size(),
Integer::sum);
+ }
+ }
+ int minAssignedSplits =
Collections.min(assignedSplitCountByReader.values());
+ int maxAssignedSplits =
Collections.max(assignedSplitCountByReader.values());
+ assertThat(maxAssignedSplits - minAssignedSplits)
+ .as(
+ "global mode should keep per-reader split counts
balanced, current assigned split count is "
+ + assignedSplitCountByReader.values())
+ .isLessThanOrEqualTo(1);
+ }
+ }
+
+ @Test
+ public void
testGlobalModeBalancesAssignmentsWithClusterAndPartitionExpansion()
+ throws Throwable {
+ final int numSubtasks = 4;
+ final String expandedTopic = TOPIC + "_expanded";
+ final int expandedTopicPartitions = NUM_SPLITS_PER_CLUSTER * 2;
+ DynamicKafkaSourceTestHelper.createTopic(expandedTopic,
expandedTopicPartitions, 1);
+ DynamicKafkaSourceTestHelper.produceToKafka(
+ expandedTopic, expandedTopicPartitions, NUM_RECORDS_PER_SPLIT);
+
+ KafkaStream initialStream =
+ new KafkaStream(
+ TOPIC,
DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC));
+
+ Map<String, ClusterMetadata> expandedClusterMetadata = new HashMap<>();
+ expandedClusterMetadata.putAll(
+ DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC,
expandedTopic));
+ expandedClusterMetadata.putAll(
+ DynamicKafkaSourceTestHelper.getClusterMetadataMap(1, TOPIC,
expandedTopic));
+ KafkaStream expandedStream = new KafkaStream(TOPIC,
expandedClusterMetadata);
+
+ MockKafkaMetadataService metadataService =
+ new
MockKafkaMetadataService(Collections.singleton(initialStream));
+ try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+ new MockSplitEnumeratorContext<>(numSubtasks);
+ DynamicKafkaSourceEnumerator enumerator =
+ createEnumerator(
+ context,
+ metadataService,
+ properties -> {
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL
+ .name()
+ .toLowerCase());
+ properties.setProperty(
+ DynamicKafkaSourceOptions
+
.STREAM_METADATA_DISCOVERY_INTERVAL_MS
+ .key(),
+ "1");
+ })) {
+ enumerator.start();
+ context.runPeriodicCallable(0);
+ runAllOneTimeCallables(context);
+
+ for (int i = 0; i < numSubtasks; i++) {
+ mockRegisterReaderAndSendReaderStartupEvent(context,
enumerator, i);
+ }
+
+
verifyAllSplitsHaveBeenAssigned(context.getSplitsAssignmentSequence(),
initialStream);
+
+ int assignmentsBeforeUpdate =
context.getSplitsAssignmentSequence().size();
+
+
metadataService.setKafkaStreams(Collections.singleton(expandedStream));
+ context.runPeriodicCallable(0);
+ runAllOneTimeCallables(context);
+
+ Map<String, Map<String, Integer>>
expectedPartitionsByClusterAndTopic = new HashMap<>();
+ Map<String, Integer> expectedTopicPartitions = new HashMap<>();
+ expectedTopicPartitions.put(TOPIC, NUM_SPLITS_PER_CLUSTER);
+ expectedTopicPartitions.put(expandedTopic,
expandedTopicPartitions);
+ expectedPartitionsByClusterAndTopic.put(
+ DynamicKafkaSourceTestHelper.getKafkaClusterId(0),
+ new HashMap<>(expectedTopicPartitions));
+ expectedPartitionsByClusterAndTopic.put(
+ DynamicKafkaSourceTestHelper.getKafkaClusterId(1),
+ new HashMap<>(expectedTopicPartitions));
+ verifyExpectedTopicPartitions(
+ context.getSplitsAssignmentSequence(),
expectedPartitionsByClusterAndTopic);
+
+ List<SplitsAssignment<DynamicKafkaSourceSplit>>
incrementalAssignments =
+ context.getSplitsAssignmentSequence()
+ .subList(
+ assignmentsBeforeUpdate,
+
context.getSplitsAssignmentSequence().size());
+ assertThat(incrementalAssignments)
+ .as("metadata expansion should produce new split
assignments")
+ .isNotEmpty();
+ assertAssignmentsBalanced(incrementalAssignments, numSubtasks);
+ }
+ }
+
+ @Test
+ public void testGlobalModeClusterAndTopicShrinkThenRescaleRecovery()
throws Throwable {
+ final int initialParallelism = 8;
+ final int restoredParallelism = 8;
+ final String expandedTopic = TOPIC + "_shrink_expand";
+ final int expandedTopicPartitions = NUM_SPLITS_PER_CLUSTER + 1;
+
+ DynamicKafkaSourceTestHelper.createTopic(expandedTopic,
expandedTopicPartitions, 1);
+ DynamicKafkaSourceTestHelper.produceToKafka(
+ expandedTopic, expandedTopicPartitions, NUM_RECORDS_PER_SPLIT);
+
+ KafkaStream shrunkStream =
+ new KafkaStream(
+ TOPIC,
DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC));
+
+ Map<String, ClusterMetadata> expandedClusterMetadata = new HashMap<>();
+ expandedClusterMetadata.putAll(
+ DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC,
expandedTopic));
+ expandedClusterMetadata.putAll(
+ DynamicKafkaSourceTestHelper.getClusterMetadataMap(1, TOPIC,
expandedTopic));
+ KafkaStream expandedStream = new KafkaStream(TOPIC,
expandedClusterMetadata);
+
+ DynamicKafkaSourceEnumState shrunkCheckpoint;
+ MockKafkaMetadataService metadataService =
+ new
MockKafkaMetadataService(Collections.singleton(expandedStream));
+ try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+ new MockSplitEnumeratorContext<>(initialParallelism);
+ DynamicKafkaSourceEnumerator enumerator =
+ createEnumerator(
+ context,
+ metadataService,
+ properties -> {
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL
+ .name()
+ .toLowerCase());
+ properties.setProperty(
+ DynamicKafkaSourceOptions
+
.STREAM_METADATA_DISCOVERY_INTERVAL_MS
+ .key(),
+ "1");
+ })) {
+ enumerator.start();
+ context.runPeriodicCallable(0);
+ runAllOneTimeCallables(context);
+
+ for (int i = 0; i < initialParallelism; i++) {
+ mockRegisterReaderAndSendReaderStartupEvent(context,
enumerator, i);
+ }
+
+ Map<String, Map<String, Integer>> expandedExpectedPartitions = new
HashMap<>();
+ Map<String, Integer> expandedExpectedTopicPartitions = new
HashMap<>();
+ expandedExpectedTopicPartitions.put(TOPIC, NUM_SPLITS_PER_CLUSTER);
+ expandedExpectedTopicPartitions.put(expandedTopic,
expandedTopicPartitions);
+ expandedExpectedPartitions.put(
+ DynamicKafkaSourceTestHelper.getKafkaClusterId(0),
+ new HashMap<>(expandedExpectedTopicPartitions));
+ expandedExpectedPartitions.put(
+ DynamicKafkaSourceTestHelper.getKafkaClusterId(1),
+ new HashMap<>(expandedExpectedTopicPartitions));
+ verifyExpectedTopicPartitions(
+ context.getSplitsAssignmentSequence(),
expandedExpectedPartitions);
+
+ Map<Integer, Set<String>> fullAssignmentsBeforeShrink =
+ getReaderAssignmentsBySplitId(
+ context.getSplitsAssignmentSequence(),
initialParallelism);
+
+ int assignmentsBeforeShrink =
context.getSplitsAssignmentSequence().size();
+
metadataService.setKafkaStreams(Collections.singleton(shrunkStream));
+ context.runPeriodicCallable(0);
+ runAllOneTimeCallables(context);
+ assertThat(context.getSplitsAssignmentSequence().size())
+ .as("metadata shrink should not create new split
assignments")
+ .isEqualTo(assignmentsBeforeShrink);
+
+ shrunkCheckpoint = enumerator.snapshotState(-1);
+ Map<String, KafkaSourceEnumState> shrunkClusterStates =
+ shrunkCheckpoint.getClusterEnumeratorStates();
+ String cluster0 =
DynamicKafkaSourceTestHelper.getKafkaClusterId(0);
+ String cluster1 =
DynamicKafkaSourceTestHelper.getKafkaClusterId(1);
+ assertThat(shrunkClusterStates)
+ .as("checkpoint should only retain the still-active
cluster")
+ .containsOnlyKeys(cluster0);
+ assertThat(shrunkClusterStates.get(cluster0).assignedSplits())
+ .as("only active topic partitions should remain after
shrink")
+ .hasSize(NUM_SPLITS_PER_CLUSTER)
+ .allSatisfy(
+ split ->
+ assertThat(split.getTopic())
+ .as("removed topics should not
remain in checkpoint")
+ .isEqualTo(TOPIC));
+ assertThat(shrunkClusterStates).doesNotContainKey(cluster1);
+
+ Set<String> activeSplitIdsAfterShrink =
+ shrunkClusterStates.get(cluster0).assignedSplits().stream()
+ .map(split -> dynamicSplitId(cluster0, split))
+ .collect(Collectors.toSet());
+ Map<Integer, Set<String>> activeAssignmentsAfterShrink =
+ retainOnlyActiveSplits(fullAssignmentsBeforeShrink,
activeSplitIdsAfterShrink);
+ long idleReadersAfterShrink =
+
activeAssignmentsAfterShrink.values().stream().filter(Set::isEmpty).count();
+ assertThat(idleReadersAfterShrink)
+ .as("shrink should leave reader holes without rebalancing
existing assignments")
+ .isGreaterThanOrEqualTo(initialParallelism -
NUM_SPLITS_PER_CLUSTER);
+ }
+
+ MockKafkaMetadataService restoredMetadataService =
+ new
MockKafkaMetadataService(Collections.singleton(shrunkStream));
+ Properties properties = new Properties();
+
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
"0");
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "1");
+ properties.setProperty(
+ DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL.name().toLowerCase());
+
+ try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit>
restoredContext =
+ new MockSplitEnumeratorContext<>(restoredParallelism);
+ DynamicKafkaSourceEnumerator restoredEnumerator =
+ new DynamicKafkaSourceEnumerator(
+ new
KafkaStreamSetSubscriber(Collections.singleton(TOPIC)),
+ restoredMetadataService,
+ restoredContext,
+ OffsetsInitializer.earliest(),
+ new NoStoppingOffsetsInitializer(),
+ properties,
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ shrunkCheckpoint,
+ new TestKafkaEnumContextProxyFactory())) {
+ restoredEnumerator.start();
+ for (int i = 0; i < restoredParallelism; i++) {
+ mockRegisterReaderAndSendReaderStartupEvent(restoredContext,
restoredEnumerator, i);
+ }
+
+ restoredContext.runPeriodicCallable(0);
+ runAllOneTimeCallables(restoredContext);
+ int assignmentsBeforeRegrowth =
restoredContext.getSplitsAssignmentSequence().size();
+
+
restoredMetadataService.setKafkaStreams(Collections.singleton(expandedStream));
+ restoredContext.runPeriodicCallable(0);
+ runAllOneTimeCallables(restoredContext);
+
+ List<SplitsAssignment<DynamicKafkaSourceSplit>>
incrementalAssignments =
+ restoredContext
+ .getSplitsAssignmentSequence()
+ .subList(
+ assignmentsBeforeRegrowth,
+
restoredContext.getSplitsAssignmentSequence().size());
+ assertThat(incrementalAssignments)
+ .as("regrowth should assign only newly reintroduced
cluster/topic splits")
+ .isNotEmpty();
+ assertAssignmentsBalanced(incrementalAssignments,
restoredParallelism);
+
+ DynamicKafkaSourceEnumState restoredSnapshot =
restoredEnumerator.snapshotState(-1);
+ String cluster0 =
DynamicKafkaSourceTestHelper.getKafkaClusterId(0);
+ String cluster1 =
DynamicKafkaSourceTestHelper.getKafkaClusterId(1);
+ assertThat(restoredSnapshot.getClusterEnumeratorStates())
+ .as("regrowth should restore both clusters in enumerator
state")
+ .containsKeys(cluster0, cluster1);
+
+ for (String clusterId : ImmutableSet.of(cluster0, cluster1)) {
+ KafkaSourceEnumState clusterState =
+
restoredSnapshot.getClusterEnumeratorStates().get(clusterId);
+ assertThat(clusterState.assignedSplits())
+ .as("each cluster should track expected assignments
after regrowth")
+ .hasSize(NUM_SPLITS_PER_CLUSTER +
expandedTopicPartitions);
+
+ Set<TopicPartition> assignedTopicPartitions =
+ clusterState.assignedSplits().stream()
+ .map(KafkaPartitionSplit::getTopicPartition)
+ .collect(Collectors.toSet());
+ for (int partition = 0; partition < NUM_SPLITS_PER_CLUSTER;
partition++) {
+ assertThat(assignedTopicPartitions)
+ .contains(new TopicPartition(TOPIC, partition));
+ }
+ for (int partition = 0; partition < expandedTopicPartitions;
partition++) {
+ assertThat(assignedTopicPartitions)
+ .contains(new TopicPartition(expandedTopic,
partition));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void
testGlobalModeShrinkRestoreAndRuntimeRepartitionAfterDownscale() throws
Throwable {
+ // Expect runtime repartition to rebalance 10 active splits over p=4
as 3,3,2,2.
+ runGlobalModeShrinkRestoreAndRuntimeRepartitionScenario(
+ "downscale", 4, List.of(3, 3, 2, 2));
+ }
+
+ @Test
+ public void testGlobalModeShrinkRestoreAndRuntimeRepartitionAfterUpscale()
throws Throwable {
+ // Expect runtime repartition to rebalance 10 active splits over p=8
as 2,2,1,1,1,1,1,1.
+ runGlobalModeShrinkRestoreAndRuntimeRepartitionScenario(
+ "upscale", 8, List.of(2, 2, 1, 1, 1, 1, 1, 1));
+ }
+
+ private void runGlobalModeShrinkRestoreAndRuntimeRepartitionScenario(
+ String scenarioName, int restoredParallelism, List<Integer>
expectedRepartitionCounts)
+ throws Throwable {
+ // Scenario summary:
+ // 1) Start in global mode with 13 splits over p=5 and verify
near-balance.
+ // 2) Shrink metadata so active splits become 10 and verify no
immediate rebalance.
+ // 3) Simulate runtime restore repartition for new parallelism and
verify balanced counts.
+ // 4) Restore enumerator from checkpoint and regrow metadata to verify
incremental
+ // assignment.
+ final int initialParallelism = 5;
+ final String removedTopic = TOPIC + "_removed3_" + scenarioName;
+ final String retainedTopic = TOPIC + "_retained4_" + scenarioName;
+
+ DynamicKafkaSourceTestHelper.createTopic(removedTopic,
NUM_SPLITS_PER_CLUSTER, 1);
+ DynamicKafkaSourceTestHelper.produceToKafka(
+ removedTopic, NUM_SPLITS_PER_CLUSTER, NUM_RECORDS_PER_SPLIT);
+ DynamicKafkaSourceTestHelper.createTopic(retainedTopic,
NUM_SPLITS_PER_CLUSTER + 1, 1);
+ DynamicKafkaSourceTestHelper.produceToKafka(
+ retainedTopic, NUM_SPLITS_PER_CLUSTER + 1,
NUM_RECORDS_PER_SPLIT);
+
+ Map<String, ClusterMetadata> initialClusterMetadata = new HashMap<>();
+ initialClusterMetadata.putAll(
+ DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC,
removedTopic));
+ initialClusterMetadata.putAll(
+ DynamicKafkaSourceTestHelper.getClusterMetadataMap(1, TOPIC,
retainedTopic));
+ KafkaStream initialStream = new KafkaStream(TOPIC,
initialClusterMetadata);
+
+ Map<String, ClusterMetadata> shrunkClusterMetadata = new HashMap<>();
+
shrunkClusterMetadata.putAll(DynamicKafkaSourceTestHelper.getClusterMetadataMap(0,
TOPIC));
+ shrunkClusterMetadata.putAll(
+ DynamicKafkaSourceTestHelper.getClusterMetadataMap(1, TOPIC,
retainedTopic));
+ KafkaStream shrunkStream = new KafkaStream(TOPIC,
shrunkClusterMetadata);
+
+ DynamicKafkaSourceEnumState shrunkCheckpoint;
+ MockKafkaMetadataService metadataService =
+ new
MockKafkaMetadataService(Collections.singleton(initialStream));
+ try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+ new MockSplitEnumeratorContext<>(initialParallelism);
+ DynamicKafkaSourceEnumerator enumerator =
+ createEnumerator(
+ context,
+ metadataService,
+ properties -> {
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL
+ .name()
+ .toLowerCase());
+ properties.setProperty(
+ DynamicKafkaSourceOptions
+
.STREAM_METADATA_DISCOVERY_INTERVAL_MS
+ .key(),
+ "1");
+ })) {
+ enumerator.start();
+ context.runPeriodicCallable(0);
+ runAllOneTimeCallables(context);
+
+ for (int i = 0; i < initialParallelism; i++) {
+ mockRegisterReaderAndSendReaderStartupEvent(context,
enumerator, i);
+ }
+
+ Map<String, Map<String, Integer>> expectedInitialPartitions = new
HashMap<>();
+ Map<String, Integer> cluster0TopicPartitions = new HashMap<>();
+ cluster0TopicPartitions.put(TOPIC, NUM_SPLITS_PER_CLUSTER);
+ cluster0TopicPartitions.put(removedTopic, NUM_SPLITS_PER_CLUSTER);
+ expectedInitialPartitions.put(
+ DynamicKafkaSourceTestHelper.getKafkaClusterId(0),
cluster0TopicPartitions);
+ Map<String, Integer> cluster1TopicPartitions = new HashMap<>();
+ cluster1TopicPartitions.put(TOPIC, NUM_SPLITS_PER_CLUSTER);
+ cluster1TopicPartitions.put(retainedTopic, NUM_SPLITS_PER_CLUSTER
+ 1);
+ expectedInitialPartitions.put(
+ DynamicKafkaSourceTestHelper.getKafkaClusterId(1),
cluster1TopicPartitions);
+ verifyExpectedTopicPartitions(
+ context.getSplitsAssignmentSequence(),
expectedInitialPartitions);
+
+ Map<Integer, Set<String>> fullAssignmentsBeforeShrink =
+ getReaderAssignmentsBySplitId(
+ context.getSplitsAssignmentSequence(),
initialParallelism);
+ Map<Integer, Integer> countsBeforeShrink =
+ getReaderSplitCounts(fullAssignmentsBeforeShrink,
initialParallelism);
+ assertThat(
+ Collections.max(countsBeforeShrink.values())
+ -
Collections.min(countsBeforeShrink.values()))
+ .as("initial global assignment should be near-balanced")
+ .isLessThanOrEqualTo(1);
+
+ int assignmentsBeforeShrink =
context.getSplitsAssignmentSequence().size();
+
metadataService.setKafkaStreams(Collections.singleton(shrunkStream));
+ context.runPeriodicCallable(0);
+ runAllOneTimeCallables(context);
+ assertThat(context.getSplitsAssignmentSequence().size())
+ .as("metadata shrink should not create new split
assignments")
+ .isEqualTo(assignmentsBeforeShrink);
+
+ shrunkCheckpoint = enumerator.snapshotState(-1);
+ Set<String> activeSplitIdsAfterShrink =
+
shrunkCheckpoint.getClusterEnumeratorStates().entrySet().stream()
+ .flatMap(
+ entry ->
+
entry.getValue().assignedSplits().stream()
+ .map(
+ split ->
+
dynamicSplitId(
+
entry.getKey(), split)))
+ .collect(Collectors.toSet());
+ assertThat(activeSplitIdsAfterShrink)
+ .as("shrink scenario keeps exactly 10 active splits")
+ .hasSize(10);
+
+ Map<Integer, Set<String>> activeAssignmentsAfterShrink =
+ retainOnlyActiveSplits(fullAssignmentsBeforeShrink,
activeSplitIdsAfterShrink);
+ Map<Integer, Integer> countsAfterShrink =
+ getReaderSplitCounts(activeAssignmentsAfterShrink,
initialParallelism);
+
+ List<Integer> countsAfterRuntimeRepartition =
+ repartitionReaderStateCountsForRestore(countsAfterShrink,
restoredParallelism);
+ assertThat(countsAfterRuntimeRepartition)
+ .as("runtime repartition on restore should match expected
balanced counts")
+ .containsExactlyElementsOf(expectedRepartitionCounts);
+ assertThat(
+ Collections.max(countsAfterRuntimeRepartition)
+ -
Collections.min(countsAfterRuntimeRepartition))
+ .as("runtime repartition keeps split-count skew within 1")
+ .isLessThanOrEqualTo(1);
+ }
+
+ MockKafkaMetadataService restoredMetadataService =
+ new
MockKafkaMetadataService(Collections.singleton(shrunkStream));
+ Properties properties = new Properties();
+
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
"0");
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "1");
+ properties.setProperty(
+ DynamicKafkaSourceOptions.STREAM_ENUMERATOR_MODE.key(),
+
DynamicKafkaSourceOptions.EnumeratorMode.GLOBAL.name().toLowerCase());
+
+ try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit>
restoredContext =
+ new MockSplitEnumeratorContext<>(restoredParallelism);
+ DynamicKafkaSourceEnumerator restoredEnumerator =
+ new DynamicKafkaSourceEnumerator(
+ new
KafkaStreamSetSubscriber(Collections.singleton(TOPIC)),
+ restoredMetadataService,
+ restoredContext,
+ OffsetsInitializer.earliest(),
+ new NoStoppingOffsetsInitializer(),
+ properties,
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ shrunkCheckpoint,
+ new TestKafkaEnumContextProxyFactory())) {
+ restoredEnumerator.start();
+ for (int i = 0; i < restoredParallelism; i++) {
+ mockRegisterReaderAndSendReaderStartupEvent(restoredContext,
restoredEnumerator, i);
+ }
+
+ restoredContext.runPeriodicCallable(0);
+ runAllOneTimeCallables(restoredContext);
+ int assignmentsBeforeRegrowth =
restoredContext.getSplitsAssignmentSequence().size();
+
+
restoredMetadataService.setKafkaStreams(Collections.singleton(initialStream));
+ restoredContext.runPeriodicCallable(0);
+ runAllOneTimeCallables(restoredContext);
+
+ List<SplitsAssignment<DynamicKafkaSourceSplit>>
incrementalAssignments =
+ restoredContext
+ .getSplitsAssignmentSequence()
+ .subList(
+ assignmentsBeforeRegrowth,
+
restoredContext.getSplitsAssignmentSequence().size());
+ assertThat(incrementalAssignments)
+ .as("regrowth should assign only newly reintroduced
splits")
+ .isNotEmpty();
+ assertAssignmentsBalanced(incrementalAssignments,
restoredParallelism);
+ }
+ }
+
@Test
public void testEnumeratorDoesNotAssignDuplicateSplitsInMetadataUpdate()
throws Throwable {
KafkaStream kafkaStreamWithOneCluster =
DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC);
@@ -949,6 +1462,166 @@ public class DynamicKafkaSourceEnumeratorTest {
return readerToSplits;
}
+ private Map<Integer, Set<String>> getReaderAssignmentsBySplitId(
+ List<SplitsAssignment<DynamicKafkaSourceSplit>>
splitsAssignmentSequence,
+ int numReaders) {
+ Map<Integer, Set<String>> readerToSplitIds = new HashMap<>();
+ for (int readerId = 0; readerId < numReaders; readerId++) {
+ readerToSplitIds.put(readerId, new HashSet<>());
+ }
+ for (SplitsAssignment<DynamicKafkaSourceSplit> split :
splitsAssignmentSequence) {
+ for (Entry<Integer, List<DynamicKafkaSourceSplit>> assignments :
+ split.assignment().entrySet()) {
+ readerToSplitIds
+ .computeIfAbsent(assignments.getKey(), ignored -> new
HashSet<>())
+ .addAll(
+ assignments.getValue().stream()
+ .map(DynamicKafkaSourceSplit::splitId)
+ .collect(Collectors.toSet()));
+ }
+ }
+ return readerToSplitIds;
+ }
+
+ private Map<Integer, Set<String>> retainOnlyActiveSplits(
+ Map<Integer, Set<String>> readerToSplitIds, Set<String>
activeSplitIds) {
+ Map<Integer, Set<String>> filtered = new HashMap<>();
+ for (Entry<Integer, Set<String>> entry : readerToSplitIds.entrySet()) {
+ Set<String> activeForReader =
+ entry.getValue().stream()
+ .filter(activeSplitIds::contains)
+ .collect(Collectors.toCollection(HashSet::new));
+ filtered.put(entry.getKey(), activeForReader);
+ }
+ return filtered;
+ }
+
+ private static String dynamicSplitId(String clusterId, KafkaPartitionSplit
split) {
+ return clusterId + "-" + split.splitId();
+ }
+
+ private void verifyExpectedTopicPartitions(
+ List<SplitsAssignment<DynamicKafkaSourceSplit>>
splitsAssignmentSequence,
+ Map<String, Map<String, Integer>>
expectedPartitionsByClusterAndTopic) {
+ Map<String, Map<String, Set<Integer>>>
observedPartitionsByClusterAndTopic =
+ new HashMap<>();
+ for (SplitsAssignment<DynamicKafkaSourceSplit> split :
splitsAssignmentSequence) {
+ for (Entry<Integer, List<DynamicKafkaSourceSplit>> assignments :
+ split.assignment().entrySet()) {
+ for (DynamicKafkaSourceSplit assignment :
assignments.getValue()) {
+ observedPartitionsByClusterAndTopic
+ .computeIfAbsent(
+ assignment.getKafkaClusterId(), ignored ->
new HashMap<>())
+ .computeIfAbsent(
+
assignment.getKafkaPartitionSplit().getTopic(),
+ ignored -> new HashSet<>())
+
.add(assignment.getKafkaPartitionSplit().getPartition());
+ }
+ }
+ }
+
+ for (Entry<String, Map<String, Integer>> clusterExpected :
+ expectedPartitionsByClusterAndTopic.entrySet()) {
+ assertThat(observedPartitionsByClusterAndTopic)
+ .as("expected cluster must have assigned splits")
+ .containsKey(clusterExpected.getKey());
+ for (Entry<String, Integer> topicExpected :
clusterExpected.getValue().entrySet()) {
+ Set<Integer> observedTopicPartitions =
+ observedPartitionsByClusterAndTopic
+ .get(clusterExpected.getKey())
+ .get(topicExpected.getKey());
+ assertThat(observedTopicPartitions)
+ .as("expected topic must have assigned splits")
+ .isNotNull();
+
assertThat(observedTopicPartitions).hasSize(topicExpected.getValue());
+ for (int partition = 0; partition < topicExpected.getValue();
partition++) {
+ assertThat(observedTopicPartitions).contains(partition);
+ }
+ }
+ }
+ }
+
+ private void assertAssignmentsBalanced(
+ List<SplitsAssignment<DynamicKafkaSourceSplit>>
assignmentSequence, int numReaders) {
+ Map<Integer, Integer> assignedSplitCountByReader = new HashMap<>();
+ for (int readerId = 0; readerId < numReaders; readerId++) {
+ assignedSplitCountByReader.put(readerId, 0);
+ }
+ for (SplitsAssignment<DynamicKafkaSourceSplit> assignment :
assignmentSequence) {
+ for (Entry<Integer, List<DynamicKafkaSourceSplit>> entry :
+ assignment.assignment().entrySet()) {
+ assignedSplitCountByReader.merge(
+ entry.getKey(), entry.getValue().size(), Integer::sum);
+ }
+ }
+
+ int minAssignedSplits =
Collections.min(assignedSplitCountByReader.values());
+ int maxAssignedSplits =
Collections.max(assignedSplitCountByReader.values());
+ assertThat(maxAssignedSplits - minAssignedSplits)
+ .as("global mode should keep per-reader split counts balanced")
+ .isLessThanOrEqualTo(1);
+ }
+
+ private Map<Integer, Integer> getReaderSplitCounts(
+ Map<Integer, Set<String>> assignmentsByReader, int numReaders) {
+ Map<Integer, Integer> splitCounts = new HashMap<>();
+ for (int readerId = 0; readerId < numReaders; readerId++) {
+ splitCounts.put(readerId, 0);
+ }
+ assignmentsByReader.forEach(
+ (readerId, splitIds) -> splitCounts.put(readerId,
splitIds.size()));
+ return splitCounts;
+ }
+
+ private List<Integer> repartitionReaderStateCountsForRestore(
+ Map<Integer, Integer> splitCountsByOldSubtask, int
restoredParallelism) {
+ final int oldParallelism = splitCountsByOldSubtask.size();
+ List<List<OperatorStateHandle>> previousState = new ArrayList<>();
+
+ long offsetSeed = 0L;
+ for (int subtask = 0; subtask < oldParallelism; subtask++) {
+ int splitCount = splitCountsByOldSubtask.getOrDefault(subtask, 0);
+ long[] offsets = new long[splitCount];
+ for (int i = 0; i < splitCount; i++) {
+ offsets[i] = offsetSeed++;
+ }
+
+ Map<String, OperatorStateHandle.StateMetaInfo> stateNameToMeta =
new HashMap<>();
+ stateNameToMeta.put(
+ SOURCE_READER_SPLIT_STATE_NAME,
+ new OperatorStateHandle.StateMetaInfo(
+ offsets,
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+
+ previousState.add(
+ Collections.singletonList(
+ new OperatorStreamStateHandle(
+ stateNameToMeta,
+ new ByteStreamStateHandle(
+ "reader-state-" + subtask, new
byte[0]))));
+ }
+
+ List<List<OperatorStateHandle>> repartitionedState =
+ RoundRobinOperatorStateRepartitioner.INSTANCE.repartitionState(
+ previousState, oldParallelism, restoredParallelism);
+
+ List<Integer> splitCountsAfterRepartition = new
ArrayList<>(repartitionedState.size());
+ for (List<OperatorStateHandle> subtaskStateHandles :
repartitionedState) {
+ int splitCount = 0;
+ for (OperatorStateHandle stateHandle : subtaskStateHandles) {
+ splitCount +=
+
stateHandle.getStateNameToPartitionOffsets().entrySet().stream()
+ .filter(
+ entry ->
+
SOURCE_READER_SPLIT_STATE_NAME.equals(
+ entry.getKey()))
+ .mapToInt(entry ->
entry.getValue().getOffsets().length)
+ .sum();
+ }
+ splitCountsAfterRepartition.add(splitCount);
+ }
+ return splitCountsAfterRepartition;
+ }
+
private List<TopicPartition> getFilteredTopicPartitions(
DynamicKafkaSourceEnumState state, String topic, AssignmentStatus
assignmentStatus) {
return state.getClusterEnumeratorStates().values().stream()
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssignerTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssignerTest.java
new file mode 100644
index 00000000..d8dd6681
--- /dev/null
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/GlobalSplitOwnerAssignerTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.dynamic.source.enumerator;
+
+import
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link GlobalSplitOwnerAssigner}. */
+class GlobalSplitOwnerAssignerTest {
+
+ @Test
+ void testRoundRobinAssignmentAcrossClustersWithMetadataChanges() {
+ GlobalSplitOwnerAssigner assigner = new GlobalSplitOwnerAssigner();
+
+ Set<String> activeSplitIds =
+ new HashSet<>(
+ Arrays.asList(
+ split("cluster-a", "topic-a", 0).splitId(),
+ split("cluster-a", "topic-a", 1).splitId(),
+ split("cluster-b", "topic-a", 0).splitId(),
+ split("cluster-b", "topic-a", 1).splitId()));
+ assigner.onMetadataRefresh(activeSplitIds);
+
+ assertThat(assigner.assignSplitOwner(split("cluster-a", "topic-a",
2).splitId(), 3))
+ .as("new split after 4 active splits should use 4 %% 3")
+ .isEqualTo(1);
+ assertThat(assigner.assignSplitOwner(split("cluster-b", "topic-a",
2).splitId(), 3))
+ .as("next split should continue global round-robin order")
+ .isEqualTo(2);
+
+ Set<String> updatedSplitIds =
+ new HashSet<>(
+ Arrays.asList(
+ split("cluster-a", "topic-a", 0).splitId(),
+ split("cluster-a", "topic-a", 1).splitId(),
+ split("cluster-c", "topic-z", 0).splitId()));
+ assigner.onMetadataRefresh(updatedSplitIds);
+
+ assertThat(assigner.assignSplitOwner(split("cluster-c", "topic-z",
1).splitId(), 3))
+ .as("metadata refresh should reseed round-robin by current
active split count")
+ .isEqualTo(0);
+ }
+
+ @Test
+ void testFailureRecoveryPrefersSplitBackOwner() {
+ GlobalSplitOwnerAssigner assigner = new GlobalSplitOwnerAssigner();
+
+ Set<String> activeSplitIds =
+ new HashSet<>(
+ Arrays.asList(
+ split("cluster-a", "topic-a", 0).splitId(),
+ split("cluster-a", "topic-a", 1).splitId(),
+ split("cluster-b", "topic-a", 0).splitId(),
+ split("cluster-b", "topic-a", 1).splitId()));
+ assigner.onMetadataRefresh(activeSplitIds);
+
+ DynamicKafkaSourceSplit returnedSplit = split("cluster-b", "topic-a",
2);
+ assigner.onSplitsBack(Collections.singletonList(returnedSplit), 2);
+
+ assertThat(assigner.assignSplitOwner(returnedSplit.splitId(), 4))
+ .as("split-back should preserve owner affinity when subtask id
is valid")
+ .isEqualTo(2);
+ }
+
+ @Test
+ void testRepartitionFallsBackToRoundRobinWhenSplitBackOwnerOutOfRange() {
+ GlobalSplitOwnerAssigner assigner = new GlobalSplitOwnerAssigner();
+
+ Set<String> activeSplitIds =
+ new HashSet<>(
+ Arrays.asList(
+ split("cluster-a", "topic-a", 0).splitId(),
+ split("cluster-a", "topic-a", 1).splitId(),
+ split("cluster-a", "topic-a", 2).splitId(),
+ split("cluster-b", "topic-b", 0).splitId(),
+ split("cluster-b", "topic-b", 1).splitId()));
+ assigner.onMetadataRefresh(activeSplitIds);
+
+ DynamicKafkaSourceSplit returnedSplit = split("cluster-b", "topic-b",
2);
+ assigner.onSplitsBack(Collections.singletonList(returnedSplit), 4);
+
+ assertThat(assigner.assignSplitOwner(returnedSplit.splitId(), 3))
+ .as("after downscale, invalid split-back owner should fallback
to RR")
+ .isEqualTo(0);
+
+ Set<String> restoredSplitIds = new HashSet<>(activeSplitIds);
+ restoredSplitIds.add(returnedSplit.splitId());
+ assigner.onMetadataRefresh(restoredSplitIds);
+
+ assertThat(assigner.assignSplitOwner(split("cluster-c", "topic-c",
0).splitId(), 2))
+ .as("after restore/repartition, RR should seed from restored
active split count")
+ .isEqualTo(0);
+ }
+
+ @Test
+ void testForwardLookingBalanceStrategy() {
+ GlobalSplitOwnerAssigner assigner = new GlobalSplitOwnerAssigner();
+ final int parallelism = 5;
+
+ // Step 1: verify baseline RR shape with 13 discovered splits over 5
readers.
+ // This yields a near-balanced 3,3,3,2,2 distribution.
+ Map<Integer, List<String>> assignedBeforeShrink =
initAssignments(parallelism);
+ Map<Integer, Integer> countsBeforeShrink =
counts(assignedBeforeShrink);
+ assertThat(countsBeforeShrink).containsEntry(0, 3).containsEntry(1,
3).containsEntry(2, 3);
+ assertThat(countsBeforeShrink).containsEntry(3, 2).containsEntry(4, 2);
+
+ // Step 2: emulate metadata shrink by removing 3 active splits.
+ // We intentionally do NOT "move" existing assignments, mirroring
forward-looking behavior.
+ Set<String> activeAfterShrink =
+ assignedBeforeShrink.values().stream()
+ .flatMap(List::stream)
+ .collect(Collectors.toCollection(HashSet::new));
+ activeAfterShrink.remove(assignedBeforeShrink.get(2).get(2));
+ activeAfterShrink.remove(assignedBeforeShrink.get(3).get(1));
+ activeAfterShrink.remove(assignedBeforeShrink.get(4).get(1));
+ assigner.onMetadataRefresh(activeAfterShrink);
+
+ Map<Integer, List<String>> activeAssignmentsAfterShrink = new
LinkedHashMap<>();
+ for (Map.Entry<Integer, List<String>> entry :
assignedBeforeShrink.entrySet()) {
+ activeAssignmentsAfterShrink.put(
+ entry.getKey(),
+ entry.getValue().stream()
+ .filter(activeAfterShrink::contains)
+ .collect(Collectors.toList()));
+ }
+ Map<Integer, Integer> countsAfterShrink =
counts(activeAssignmentsAfterShrink);
+ assertThat(countsAfterShrink)
+ .containsEntry(0, 3)
+ .containsEntry(1, 3)
+ .containsEntry(2, 2)
+ .containsEntry(3, 1)
+ .containsEntry(4, 1);
+
+ // Step 3: new assignments start from activeCount % parallelism after
shrink.
+ // This proves the strategy reseeds from current active inventory
rather than preserving an
+ // old "next owner" cursor from before shrink.
+ int ownerAfterShrink =
assigner.assignSplitOwner("split-after-shrink-0", parallelism);
+ int nextOwnerAfterShrink =
assigner.assignSplitOwner("split-after-shrink-1", parallelism);
+ assertThat(ownerAfterShrink).isEqualTo(0);
+ assertThat(nextOwnerAfterShrink).isEqualTo(1);
+
+ // If nothing were removed, next owner after 13 initial splits would
have been 13%5=3.
+ assertThat(13 % parallelism).isEqualTo(3);
+ }
+
+ private static Map<Integer, List<String>> initAssignments(int parallelism)
{
+ GlobalSplitOwnerAssigner assigner = new GlobalSplitOwnerAssigner();
+ Map<Integer, List<String>> assignments = new LinkedHashMap<>();
+ for (int i = 0; i < parallelism; i++) {
+ assignments.put(i, new ArrayList<>());
+ }
+ for (int i = 0; i < 13; i++) {
+ String splitId = "split-" + i;
+ int owner = assigner.assignSplitOwner(splitId, parallelism);
+ assignments.get(owner).add(splitId);
+ }
+ return assignments;
+ }
+
+ private static Map<Integer, Integer> counts(Map<Integer, List<String>>
assignments) {
+ Map<Integer, Integer> counts = new LinkedHashMap<>();
+ for (Map.Entry<Integer, List<String>> entry : assignments.entrySet()) {
+ counts.put(entry.getKey(), entry.getValue().size());
+ }
+ return counts;
+ }
+
+ private static DynamicKafkaSourceSplit split(String clusterId, String
topic, int partition) {
+ return new DynamicKafkaSourceSplit(
+ clusterId, new KafkaPartitionSplit(new TopicPartition(topic,
partition), 0L));
+ }
+}