This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b69b2b2 [FLINK-38876] Support per-cluster offset in Dynamic Kafka 
Source (#209)
5b69b2b2 is described below

commit 5b69b2b2af9bac5b9e6f8faa5b8aaa87b2f2ad1b
Author: bowenli86 <[email protected]>
AuthorDate: Thu Jan 15 16:22:09 2026 -0800

    [FLINK-38876] Support per-cluster offset in Dynamic Kafka Source (#209)
    
    - add support for per-cluster starting and stopping offset in 
DynamicKafkaSource
    - upgrade source state serializer to v2 and be backward compatible with v1 
(which does not have per cluster offset)
    - add corresponding tests
    - update docs, add example code snippet
---
 .../docs/connectors/datastream/dynamic-kafka.md    |  48 ++++
 .../docs/connectors/datastream/dynamic-kafka.md    |  52 ++++
 .../kafka/dynamic/metadata/ClusterMetadata.java    |  66 ++++-
 .../SingleClusterTopicMetadataService.java         |  29 +-
 .../dynamic/source/DynamicKafkaSourceBuilder.java  |   5 +-
 .../DynamicKafkaSourceEnumStateSerializer.java     | 144 ++++++++--
 .../enumerator/DynamicKafkaSourceEnumerator.java   |  58 +++-
 .../source/reader/DynamicKafkaSourceReader.java    |  19 +-
 .../dynamic/source/DynamicKafkaSourceITTest.java   | 302 +++++++++++++++++++++
 .../DynamicKafkaSourceEnumStateSerializerTest.java | 100 ++++++-
 .../SingleClusterTopicMetadataServiceTest.java     |  84 ++++++
 .../DynamicKafkaSourceEnumStateTestUtils.java      |  53 ++++
 12 files changed, 911 insertions(+), 49 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md 
b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
index abe7a753..3c9ba7cd 100644
--- a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
@@ -77,6 +77,52 @@ The Kafka metadata service, configured by 
setKafkaMetadataService(KafkaMetadataS
 The stream ids to subscribe, see the following Kafka stream subscription 
section for more details.
 Deserializer to parse Kafka messages, see the [Kafka Source Documentation]({{< 
ref "docs/connectors/datastream/kafka" >}}#deserializer) for more details.
 
+### Offsets 初始化
+
+可以通过 builder 配置全局的起始/停止 offsets。起始 offsets 同时适用于有界与无界模式;停止 offsets 
仅在有界模式下生效。Cluster metadata 可以可选地携带每个集群的起始/停止 offsets initializer;如果存在,将覆盖全局默认配置。
+
+示例:在元数据中为不同集群设置偏移量初始化规则。
+
+{{< tabs "DynamicKafkaSourceOffsets" >}}
+{{< tab "Java" >}}
+```java
+Properties cluster0Props = new Properties();
+cluster0Props.setProperty(
+    CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster0:9092");
+Properties cluster1Props = new Properties();
+cluster1Props.setProperty(
+    CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster1:9092");
+
+KafkaStream stream =
+    new KafkaStream(
+        "input-stream",
+        Map.of(
+            "cluster0",
+            new ClusterMetadata(
+                Set.of("topic-a"),
+                cluster0Props,
+                OffsetsInitializer.earliest(),
+                OffsetsInitializer.latest()),
+            "cluster1",
+            new ClusterMetadata(
+                Set.of("topic-b"),
+                cluster1Props,
+                OffsetsInitializer.latest(),
+                null)));
+
+DynamicKafkaSource<String> source =
+    DynamicKafkaSource.<String>builder()
+        .setStreamIds(Set.of(stream.getStreamId()))
+        .setKafkaMetadataService(new MockKafkaMetadataService(Set.of(stream)))
+        
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+        // 如果元数据中包含每个集群的起始 offsets,将覆盖此处设置。
+        .setStartingOffsets(OffsetsInitializer.earliest())
+        .setBounded(OffsetsInitializer.latest())
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 ### Kafka Stream Subscription
 The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
 * A set of Kafka stream ids. For example:
@@ -108,6 +154,8 @@ for any changes to the Kafka stream(s) and reconciling the 
reader tasks to subsc
 Kafka metadata returned by the service. For example, in the case of a Kafka 
migration, the source would
 swap from one cluster to the new cluster when the service makes that change in 
the Kafka stream metadata.
 
+Cluster metadata 可以包含每个集群的起始/停止 offsets initializer,用于覆盖全局 builder 配置。
+
 ### Additional Properties
 There are configuration options in DynamicKafkaSourceOptions that can be 
configured in the properties through the builder:
 <table class="table table-bordered">
diff --git a/docs/content/docs/connectors/datastream/dynamic-kafka.md 
b/docs/content/docs/connectors/datastream/dynamic-kafka.md
index e64b93e6..cad80cdd 100644
--- a/docs/content/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content/docs/connectors/datastream/dynamic-kafka.md
@@ -77,6 +77,55 @@ The Kafka metadata service, configured by 
setKafkaMetadataService(KafkaMetadataS
 The stream ids to subscribe, see the following Kafka stream subscription 
section for more details.
 Deserializer to parse Kafka messages, see the [Kafka Source Documentation]({{< 
ref "docs/connectors/datastream/kafka" >}}#deserializer) for more details.
 
+### Offsets Initialization
+
+You can configure starting and stopping offsets globally via the builder. 
Starting offsets apply to
+both bounded and unbounded sources, while stopping offsets only take effect 
when the source runs in
+bounded mode. Cluster metadata may optionally include per-cluster starting or 
stopping offsets
+initializers; if present, they override the global defaults for that cluster.
+
+Example: override offsets for specific clusters via metadata.
+
+{{< tabs "DynamicKafkaSourceOffsets" >}}
+{{< tab "Java" >}}
+```java
+Properties cluster0Props = new Properties();
+cluster0Props.setProperty(
+    CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster0:9092");
+Properties cluster1Props = new Properties();
+cluster1Props.setProperty(
+    CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster1:9092");
+
+KafkaStream stream =
+    new KafkaStream(
+        "input-stream",
+        Map.of(
+            "cluster0",
+            new ClusterMetadata(
+                Set.of("topic-a"),
+                cluster0Props,
+                OffsetsInitializer.earliest(),
+                OffsetsInitializer.latest()),
+            "cluster1",
+            new ClusterMetadata(
+                Set.of("topic-b"),
+                cluster1Props,
+                OffsetsInitializer.latest(),
+                null)));
+
+DynamicKafkaSource<String> source =
+    DynamicKafkaSource.<String>builder()
+        .setStreamIds(Set.of(stream.getStreamId()))
+        .setKafkaMetadataService(new MockKafkaMetadataService(Set.of(stream)))
+        
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+        // Overridden by per-cluster starting offsets in metadata when present.
+        .setStartingOffsets(OffsetsInitializer.earliest())
+        .setBounded(OffsetsInitializer.latest())
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 ### Kafka Stream Subscription
 The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
 * A set of Kafka stream ids. For example:
@@ -108,6 +157,9 @@ for any changes to the Kafka stream(s) and reconciling the 
reader tasks to subsc
 Kafka metadata returned by the service. For example, in the case of a Kafka 
migration, the source would 
 swap from one cluster to the new cluster when the service makes that change in 
the Kafka stream metadata.
 
+Cluster metadata can optionally carry per-cluster starting and stopping 
offsets initializers. These
+override the global builder configuration for the affected cluster.
+
 ### Additional Properties
 There are configuration options in DynamicKafkaSourceOptions that can be 
configured in the properties through the builder:
 <table class="table table-bordered">
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
index 964e51e5..8883b010 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
@@ -19,6 +19,9 @@
 package org.apache.flink.connector.kafka.dynamic.metadata;
 
 import org.apache.flink.annotation.Experimental;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+
+import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.Objects;
@@ -26,13 +29,15 @@ import java.util.Properties;
 import java.util.Set;
 
 /**
- * {@link ClusterMetadata} provides readers information about a cluster on 
what topics to read and
- * how to connect to a cluster.
+ * {@link ClusterMetadata} provides readers information about a cluster on 
what topics to read, how
+ * to connect to a cluster, and optional offsets initializers.
  */
 @Experimental
 public class ClusterMetadata implements Serializable {
     private final Set<String> topics;
     private final Properties properties;
+    @Nullable private final OffsetsInitializer startingOffsetsInitializer;
+    @Nullable private final OffsetsInitializer stoppingOffsetsInitializer;
 
     /**
      * Constructs the {@link ClusterMetadata} with the required properties.
@@ -41,8 +46,26 @@ public class ClusterMetadata implements Serializable {
      * @param properties the properties to access a cluster.
      */
     public ClusterMetadata(Set<String> topics, Properties properties) {
+        this(topics, properties, null, null);
+    }
+
+    /**
+     * Constructs the {@link ClusterMetadata} with the required properties and 
offsets.
+     *
+     * @param topics the topics belonging to a cluster.
+     * @param properties the properties to access a cluster.
+     * @param startingOffsetsInitializer the starting offsets initializer for 
the cluster.
+     * @param stoppingOffsetsInitializer the stopping offsets initializer for 
the cluster.
+     */
+    public ClusterMetadata(
+            Set<String> topics,
+            Properties properties,
+            @Nullable OffsetsInitializer startingOffsetsInitializer,
+            @Nullable OffsetsInitializer stoppingOffsetsInitializer) {
         this.topics = topics;
         this.properties = properties;
+        this.startingOffsetsInitializer = startingOffsetsInitializer;
+        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
     }
 
     /**
@@ -63,9 +86,38 @@ public class ClusterMetadata implements Serializable {
         return properties;
     }
 
+    /**
+     * Get the starting offsets initializer for the cluster.
+     *
+     * @return the starting offsets initializer, or null to use the source 
default.
+     */
+    @Nullable
+    public OffsetsInitializer getStartingOffsetsInitializer() {
+        return startingOffsetsInitializer;
+    }
+
+    /**
+     * Get the stopping offsets initializer for the cluster.
+     *
+     * @return the stopping offsets initializer, or null to use the source 
default.
+     */
+    @Nullable
+    public OffsetsInitializer getStoppingOffsetsInitializer() {
+        return stoppingOffsetsInitializer;
+    }
+
     @Override
     public String toString() {
-        return "ClusterMetadata{" + "topics=" + topics + ", properties=" + 
properties + '}';
+        return "ClusterMetadata{"
+                + "topics="
+                + topics
+                + ", properties="
+                + properties
+                + ", startingOffsetsInitializer="
+                + startingOffsetsInitializer
+                + ", stoppingOffsetsInitializer="
+                + stoppingOffsetsInitializer
+                + '}';
     }
 
     @Override
@@ -77,11 +129,15 @@ public class ClusterMetadata implements Serializable {
             return false;
         }
         ClusterMetadata that = (ClusterMetadata) o;
-        return Objects.equals(topics, that.topics) && 
Objects.equals(properties, that.properties);
+        return Objects.equals(topics, that.topics)
+                && Objects.equals(properties, that.properties)
+                && Objects.equals(startingOffsetsInitializer, 
that.startingOffsetsInitializer)
+                && Objects.equals(stoppingOffsetsInitializer, 
that.stoppingOffsetsInitializer);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(topics, properties);
+        return Objects.hash(
+                topics, properties, startingOffsetsInitializer, 
stoppingOffsetsInitializer);
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
index 1f2f0fd1..a81c5586 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
@@ -21,10 +21,13 @@ package org.apache.flink.connector.kafka.dynamic.metadata;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -43,6 +46,8 @@ public class SingleClusterTopicMetadataService implements 
KafkaMetadataService {
 
     private final String kafkaClusterId;
     private final Properties properties;
+    @Nullable private final OffsetsInitializer startingOffsetsInitializer;
+    @Nullable private final OffsetsInitializer stoppingOffsetsInitializer;
     private transient AdminClient adminClient;
 
     /**
@@ -52,8 +57,26 @@ public class SingleClusterTopicMetadataService implements 
KafkaMetadataService {
      * @param properties the properties of the Kafka cluster.
      */
     public SingleClusterTopicMetadataService(String kafkaClusterId, Properties 
properties) {
+        this(kafkaClusterId, properties, null, null);
+    }
+
+    /**
+     * Create a {@link SingleClusterTopicMetadataService} with per-cluster 
offsets initializers.
+     *
+     * @param kafkaClusterId the id of the Kafka cluster.
+     * @param properties the properties of the Kafka cluster.
+     * @param startingOffsetsInitializer the starting offsets initializer for 
the cluster.
+     * @param stoppingOffsetsInitializer the stopping offsets initializer for 
the cluster.
+     */
+    public SingleClusterTopicMetadataService(
+            String kafkaClusterId,
+            Properties properties,
+            @Nullable OffsetsInitializer startingOffsetsInitializer,
+            @Nullable OffsetsInitializer stoppingOffsetsInitializer) {
         this.kafkaClusterId = kafkaClusterId;
         this.properties = properties;
+        this.startingOffsetsInitializer = startingOffsetsInitializer;
+        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
     }
 
     /** {@inheritDoc} */
@@ -82,7 +105,11 @@ public class SingleClusterTopicMetadataService implements 
KafkaMetadataService {
 
     private KafkaStream createKafkaStream(String topic) {
         ClusterMetadata clusterMetadata =
-                new ClusterMetadata(Collections.singleton(topic), properties);
+                new ClusterMetadata(
+                        Collections.singleton(topic),
+                        properties,
+                        startingOffsetsInitializer,
+                        stoppingOffsetsInitializer);
 
         return new KafkaStream(topic, Collections.singletonMap(kafkaClusterId, 
clusterMetadata));
     }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
index eab37c4e..8e814afc 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
@@ -105,7 +105,7 @@ public class DynamicKafkaSourceBuilder<T> {
 
     /**
      * Set the source in bounded mode and specify what offsets to end at. This 
is used for all
-     * clusters.
+     * clusters unless overridden by cluster metadata.
      *
      * @param stoppingOffsetsInitializer the {@link OffsetsInitializer}.
      * @return the builder.
@@ -141,7 +141,8 @@ public class DynamicKafkaSourceBuilder<T> {
     }
 
     /**
-     * Set the starting offsets of the stream. This will be applied to all 
clusters.
+     * Set the starting offsets of the stream. This will be applied to all 
clusters unless
+     * overridden by cluster metadata.
      *
      * @param startingOffsetsInitializer the {@link OffsetsInitializer}.
      * @return the builder.
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializer.java
index b34e536c..b7f0b49a 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializer.java
@@ -23,11 +23,15 @@ import 
org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
 import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
 import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
 import 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -45,6 +49,7 @@ public class DynamicKafkaSourceEnumStateSerializer
         implements SimpleVersionedSerializer<DynamicKafkaSourceEnumState> {
 
     private static final int VERSION_1 = 1;
+    private static final int VERSION_2 = 2;
 
     private final KafkaSourceEnumStateSerializer 
kafkaSourceEnumStateSerializer;
 
@@ -54,7 +59,7 @@ public class DynamicKafkaSourceEnumStateSerializer
 
     @Override
     public int getVersion() {
-        return VERSION_1;
+        return VERSION_2;
     }
 
     @Override
@@ -63,7 +68,7 @@ public class DynamicKafkaSourceEnumStateSerializer
                 DataOutputStream out = new DataOutputStream(baos)) {
 
             Set<KafkaStream> kafkaStreams = state.getKafkaStreams();
-            serialize(kafkaStreams, out);
+            serializeV2(kafkaStreams, out);
 
             Map<String, KafkaSourceEnumState> clusterEnumeratorStates =
                     state.getClusterEnumeratorStates();
@@ -91,37 +96,39 @@ public class DynamicKafkaSourceEnumStateSerializer
     @Override
     public DynamicKafkaSourceEnumState deserialize(int version, byte[] 
serialized)
             throws IOException {
-        if (version == VERSION_1) {
-            try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized);
-                    DataInputStream in = new DataInputStream(bais)) {
-                Set<KafkaStream> kafkaStreams = deserialize(in);
-
-                Map<String, KafkaSourceEnumState> clusterEnumeratorStates = 
new HashMap<>();
-                int kafkaSourceEnumStateSerializerVersion = in.readInt();
-
-                int clusterEnumeratorStateMapSize = in.readInt();
-                for (int i = 0; i < clusterEnumeratorStateMapSize; i++) {
-                    String kafkaClusterId = in.readUTF();
-                    int byteArraySize = in.readInt();
-                    KafkaSourceEnumState kafkaSourceEnumState =
-                            kafkaSourceEnumStateSerializer.deserialize(
-                                    kafkaSourceEnumStateSerializerVersion,
-                                    readNBytes(in, byteArraySize));
-                    clusterEnumeratorStates.put(kafkaClusterId, 
kafkaSourceEnumState);
-                }
+        if (version != VERSION_1 && version != VERSION_2) {
+            throw new IOException(
+                    String.format(
+                            "The bytes are serialized with version %d, "
+                                    + "while this deserializer only supports 
version up to %d",
+                            version, getVersion()));
+        }
+
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                DataInputStream in = new DataInputStream(bais)) {
+            Set<KafkaStream> kafkaStreams =
+                    version == VERSION_1 ? deserializeV1(in) : 
deserializeV2(in);
 
-                return new DynamicKafkaSourceEnumState(kafkaStreams, 
clusterEnumeratorStates);
+            Map<String, KafkaSourceEnumState> clusterEnumeratorStates = new 
HashMap<>();
+            int kafkaSourceEnumStateSerializerVersion = in.readInt();
+
+            int clusterEnumeratorStateMapSize = in.readInt();
+            for (int i = 0; i < clusterEnumeratorStateMapSize; i++) {
+                String kafkaClusterId = in.readUTF();
+                int byteArraySize = in.readInt();
+                KafkaSourceEnumState kafkaSourceEnumState =
+                        kafkaSourceEnumStateSerializer.deserialize(
+                                kafkaSourceEnumStateSerializerVersion,
+                                readNBytes(in, byteArraySize));
+                clusterEnumeratorStates.put(kafkaClusterId, 
kafkaSourceEnumState);
             }
-        }
 
-        throw new IOException(
-                String.format(
-                        "The bytes are serialized with version %d, "
-                                + "while this deserializer only supports 
version up to %d",
-                        version, getVersion()));
+            return new DynamicKafkaSourceEnumState(kafkaStreams, 
clusterEnumeratorStates);
+        }
     }
 
-    private void serialize(Set<KafkaStream> kafkaStreams, DataOutputStream 
out) throws IOException {
+    private void serializeV2(Set<KafkaStream> kafkaStreams, DataOutputStream 
out)
+            throws IOException {
         out.writeInt(kafkaStreams.size());
         for (KafkaStream kafkaStream : kafkaStreams) {
             out.writeUTF(kafkaStream.getStreamId());
@@ -145,11 +152,13 @@ public class DynamicKafkaSourceEnumStateSerializer
                                         .getProperty(
                                                 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
                                                 "Bootstrap servers must be 
specified in properties")));
+                
writeOffsetsInitializer(clusterMetadata.getStartingOffsetsInitializer(), out);
+                
writeOffsetsInitializer(clusterMetadata.getStoppingOffsetsInitializer(), out);
             }
         }
     }
 
-    private Set<KafkaStream> deserialize(DataInputStream in) throws 
IOException {
+    private Set<KafkaStream> deserializeV1(DataInputStream in) throws 
IOException {
 
         Set<KafkaStream> kafkaStreams = new HashSet<>();
         int numStreams = in.readInt();
@@ -179,6 +188,83 @@ public class DynamicKafkaSourceEnumStateSerializer
         return kafkaStreams;
     }
 
+    private Set<KafkaStream> deserializeV2(DataInputStream in) throws 
IOException {
+        Set<KafkaStream> kafkaStreams = new HashSet<>();
+        int numStreams = in.readInt();
+        for (int i = 0; i < numStreams; i++) {
+            String streamId = in.readUTF();
+            Map<String, ClusterMetadata> clusterMetadataMap = new HashMap<>();
+            int clusterMetadataMapSize = in.readInt();
+            for (int j = 0; j < clusterMetadataMapSize; j++) {
+                String kafkaClusterId = in.readUTF();
+                int topicsSize = in.readInt();
+                Set<String> topics = new HashSet<>();
+                for (int k = 0; k < topicsSize; k++) {
+                    topics.add(in.readUTF());
+                }
+
+                String bootstrapServers = in.readUTF();
+                Properties properties = new Properties();
+                properties.setProperty(
+                        CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+
+                OffsetsInitializer startingOffsetsInitializer = 
readOffsetsInitializer(in);
+                OffsetsInitializer stoppingOffsetsInitializer = 
readOffsetsInitializer(in);
+
+                clusterMetadataMap.put(
+                        kafkaClusterId,
+                        new ClusterMetadata(
+                                topics,
+                                properties,
+                                startingOffsetsInitializer,
+                                stoppingOffsetsInitializer));
+            }
+
+            kafkaStreams.add(new KafkaStream(streamId, clusterMetadataMap));
+        }
+
+        return kafkaStreams;
+    }
+
+    private static void writeOffsetsInitializer(
+            @Nullable OffsetsInitializer offsetsInitializer, DataOutputStream 
out)
+            throws IOException {
+        if (offsetsInitializer == null) {
+            out.writeBoolean(false);
+            return;
+        }
+
+        out.writeBoolean(true);
+        byte[] serializedOffsets = 
InstantiationUtil.serializeObject(offsetsInitializer);
+        out.writeInt(serializedOffsets.length);
+        out.write(serializedOffsets);
+    }
+
+    @Nullable
+    private static OffsetsInitializer readOffsetsInitializer(DataInputStream 
in)
+            throws IOException {
+        boolean hasOffsetsInitializer = in.readBoolean();
+        if (!hasOffsetsInitializer) {
+            return null;
+        }
+
+        int serializedSize = in.readInt();
+        byte[] serializedOffsets = readNBytes(in, serializedSize);
+        try {
+            return InstantiationUtil.deserializeObject(
+                    serializedOffsets, getClassLoaderForOffsets());
+        } catch (ClassNotFoundException e) {
+            throw new IOException("Failed to deserialize OffsetsInitializer", 
e);
+        }
+    }
+
+    private static ClassLoader getClassLoaderForOffsets() {
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        return classLoader != null
+                ? classLoader
+                : DynamicKafkaSourceEnumStateSerializer.class.getClassLoader();
+    }
+
     private static byte[] readNBytes(DataInputStream in, int size) throws 
IOException {
         byte[] bytes = new byte[size];
         in.readFully(bytes);
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
index 7643e62b..c118b27f 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscr
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -161,10 +162,21 @@ public class DynamicKafkaSourceEnumerator
         this.latestKafkaStreams = 
dynamicKafkaSourceEnumState.getKafkaStreams();
 
         Map<String, Properties> clusterProperties = new HashMap<>();
+        Map<String, OffsetsInitializer> clusterStartingOffsets = new 
HashMap<>();
+        Map<String, OffsetsInitializer> clusterStoppingOffsets = new 
HashMap<>();
         for (KafkaStream kafkaStream : latestKafkaStreams) {
             for (Entry<String, ClusterMetadata> entry :
                     kafkaStream.getClusterMetadataMap().entrySet()) {
-                clusterProperties.put(entry.getKey(), 
entry.getValue().getProperties());
+                ClusterMetadata clusterMetadata = entry.getValue();
+                clusterProperties.put(entry.getKey(), 
clusterMetadata.getProperties());
+                if (clusterMetadata.getStartingOffsetsInitializer() != null) {
+                    clusterStartingOffsets.put(
+                            entry.getKey(), 
clusterMetadata.getStartingOffsetsInitializer());
+                }
+                if (clusterMetadata.getStoppingOffsetsInitializer() != null) {
+                    clusterStoppingOffsets.put(
+                            entry.getKey(), 
clusterMetadata.getStoppingOffsetsInitializer());
+                }
             }
         }
 
@@ -181,7 +193,9 @@ public class DynamicKafkaSourceEnumerator
                     clusterEnumState.getKey(),
                     this.latestClusterTopicsMap.get(clusterEnumState.getKey()),
                     clusterEnumState.getValue(),
-                    clusterProperties.get(clusterEnumState.getKey()));
+                    clusterProperties.get(clusterEnumState.getKey()),
+                    clusterStartingOffsets.get(clusterEnumState.getKey()),
+                    clusterStoppingOffsets.get(clusterEnumState.getKey()));
         }
     }
 
@@ -238,6 +252,8 @@ public class DynamicKafkaSourceEnumerator
 
         Map<String, Set<String>> newClustersTopicsMap = new HashMap<>();
         Map<String, Properties> clusterProperties = new HashMap<>();
+        Map<String, OffsetsInitializer> clusterStartingOffsets = new 
HashMap<>();
+        Map<String, OffsetsInitializer> clusterStoppingOffsets = new 
HashMap<>();
         for (KafkaStream kafkaStream : handledFetchKafkaStreams) {
             for (Entry<String, ClusterMetadata> entry :
                     kafkaStream.getClusterMetadataMap().entrySet()) {
@@ -248,6 +264,14 @@ public class DynamicKafkaSourceEnumerator
                         .computeIfAbsent(kafkaClusterId, (unused) -> new 
HashSet<>())
                         .addAll(clusterMetadata.getTopics());
                 clusterProperties.put(kafkaClusterId, 
clusterMetadata.getProperties());
+                if (clusterMetadata.getStartingOffsetsInitializer() != null) {
+                    clusterStartingOffsets.put(
+                            kafkaClusterId, 
clusterMetadata.getStartingOffsetsInitializer());
+                }
+                if (clusterMetadata.getStoppingOffsetsInitializer() != null) {
+                    clusterStoppingOffsets.put(
+                            kafkaClusterId, 
clusterMetadata.getStoppingOffsetsInitializer());
+                }
             }
         }
 
@@ -308,7 +332,9 @@ public class DynamicKafkaSourceEnumerator
                     activeClusterTopics.getKey(),
                     activeClusterTopics.getValue(),
                     newKafkaSourceEnumState,
-                    clusterProperties.get(activeClusterTopics.getKey()));
+                    clusterProperties.get(activeClusterTopics.getKey()),
+                    clusterStartingOffsets.get(activeClusterTopics.getKey()),
+                    clusterStoppingOffsets.get(activeClusterTopics.getKey()));
         }
 
         startAllEnumerators();
@@ -356,7 +382,18 @@ public class DynamicKafkaSourceEnumerator
             String kafkaClusterId,
             Set<String> topics,
             KafkaSourceEnumState kafkaSourceEnumState,
-            Properties fetchedProperties) {
+            Properties fetchedProperties,
+            @Nullable OffsetsInitializer clusterStartingOffsetsInitializer,
+            @Nullable OffsetsInitializer clusterStoppingOffsetsInitializer) {
+        OffsetsInitializer effectiveStartingOffsetsInitializer =
+                clusterStartingOffsetsInitializer != null
+                        ? clusterStartingOffsetsInitializer
+                        : startingOffsetsInitializer;
+        OffsetsInitializer effectiveStoppingOffsetsInitializer =
+                clusterStoppingOffsetsInitializer != null
+                        ? clusterStoppingOffsetsInitializer
+                        : stoppingOffsetInitializer;
+
         final Runnable signalNoMoreSplitsCallback;
         if (Boundedness.BOUNDED.equals(boundedness)) {
             signalNoMoreSplitsCallback = this::handleNoMoreSplits;
@@ -375,12 +412,18 @@ public class DynamicKafkaSourceEnumerator
         KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps);
         KafkaPropertiesUtil.copyProperties(properties, consumerProps);
         KafkaPropertiesUtil.setClientIdPrefix(consumerProps, kafkaClusterId);
+        consumerProps.setProperty(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                effectiveStartingOffsetsInitializer
+                        .getAutoOffsetResetStrategy()
+                        .name()
+                        .toLowerCase());
 
         KafkaSourceEnumerator enumerator =
                 new KafkaSourceEnumerator(
                         KafkaSubscriber.getTopicListSubscriber(new 
ArrayList<>(topics)),
-                        startingOffsetsInitializer,
-                        stoppingOffsetInitializer,
+                        effectiveStartingOffsetsInitializer,
+                        effectiveStoppingOffsetsInitializer,
                         consumerProps,
                         context,
                         boundedness,
@@ -484,7 +527,8 @@ public class DynamicKafkaSourceEnumerator
     /**
      * Besides for checkpointing, this method is used in the restart sequence 
to retain the relevant
      * assigned splits so that there is no reader duplicate split assignment. 
See {@link
-     * #createEnumeratorWithAssignedTopicPartitions(String, Set, 
KafkaSourceEnumState, Properties)}}
+     * #createEnumeratorWithAssignedTopicPartitions(String, Set, 
KafkaSourceEnumState, Properties,
+     * OffsetsInitializer, OffsetsInitializer)}}
      */
     @Override
     public DynamicKafkaSourceEnumState snapshotState(long checkpointId) throws 
Exception {
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index 38952519..3e6f0534 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetri
 import 
org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager;
 import 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
 import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
 import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
 import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
@@ -49,6 +50,7 @@ import 
org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.UserCodeClassLoader;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -246,9 +248,20 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
                                 clusterMetadataMapEntry.getKey(), (unused) -> 
new HashSet<>())
                         
.addAll(clusterMetadataMapEntry.getValue().getTopics());
 
-                newClustersProperties.put(
-                        clusterMetadataMapEntry.getKey(),
-                        clusterMetadataMapEntry.getValue().getProperties());
+                Properties clusterProperties = new Properties();
+                KafkaPropertiesUtil.copyProperties(
+                        clusterMetadataMapEntry.getValue().getProperties(), 
clusterProperties);
+                OffsetsInitializer startingOffsetsInitializer =
+                        
clusterMetadataMapEntry.getValue().getStartingOffsetsInitializer();
+                if (startingOffsetsInitializer != null) {
+                    clusterProperties.setProperty(
+                            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                            startingOffsetsInitializer
+                                    .getAutoOffsetResetStrategy()
+                                    .name()
+                                    .toLowerCase());
+                }
+                newClustersProperties.put(clusterMetadataMapEntry.getKey(), 
clusterProperties);
             }
         }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
index 06c2e591..13cc89a4 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
@@ -19,13 +19,24 @@
 package org.apache.flink.connector.kafka.dynamic.source;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
 import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
 import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
 import 
org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState;
+import 
org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumStateSerializer;
+import 
org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumerator;
+import 
org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
+import 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
+import 
org.apache.flink.connector.kafka.dynamic.source.testutils.DynamicKafkaSourceEnumStateTestUtils;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import 
org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContextFactory;
@@ -53,8 +64,10 @@ import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.TestLogger;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -250,6 +263,279 @@ public class DynamicKafkaSourceITTest extends TestLogger {
                                     .collect(Collectors.toList()));
         }
 
+        @Test
+        void testPerClusterOffsetsInitializersInUnboundedMode() throws 
Throwable {
+            String topic = "test-per-cluster-unbounded-offsets";
+            DynamicKafkaSourceTestHelper.createTopic(0, topic, NUM_PARTITIONS);
+            DynamicKafkaSourceTestHelper.createTopic(1, topic, NUM_PARTITIONS);
+
+            int cluster0Start = 0;
+            int cluster0End =
+                    DynamicKafkaSourceTestHelper.produceToKafka(
+                            0, topic, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT, 
cluster0Start);
+            int cluster1Start = cluster0End + 1000;
+            int cluster1InitialEnd =
+                    DynamicKafkaSourceTestHelper.produceToKafka(
+                            1, topic, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT, 
cluster1Start);
+            int cluster1ExtraStart = cluster1InitialEnd + 1000;
+            AtomicInteger cluster1ExtraEnd = new AtomicInteger(-1);
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(2);
+
+            Properties properties = new Properties();
+            
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
 "0");
+            properties.setProperty(
+                    
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "0");
+
+            KafkaStream kafkaStream =
+                    new KafkaStream(
+                            "test-per-cluster-unbounded-stream",
+                            ImmutableMap.of(
+                                    
kafkaClusterTestEnvMetadata0.getKafkaClusterId(),
+                                    new ClusterMetadata(
+                                            Collections.singleton(topic),
+                                            
kafkaClusterTestEnvMetadata0.getStandardProperties(),
+                                            OffsetsInitializer.earliest(),
+                                            null),
+                                    
kafkaClusterTestEnvMetadata1.getKafkaClusterId(),
+                                    new ClusterMetadata(
+                                            Collections.singleton(topic),
+                                            
kafkaClusterTestEnvMetadata1.getStandardProperties(),
+                                            OffsetsInitializer.latest(),
+                                            null)));
+
+            MockKafkaMetadataService mockKafkaMetadataService =
+                    new 
MockKafkaMetadataService(Collections.singleton(kafkaStream));
+
+            DynamicKafkaSource<Integer> dynamicKafkaSource =
+                    DynamicKafkaSource.<Integer>builder()
+                            
.setStreamIds(Collections.singleton(kafkaStream.getStreamId()))
+                            .setKafkaMetadataService(mockKafkaMetadataService)
+                            .setDeserializer(
+                                    KafkaRecordDeserializationSchema.valueOnly(
+                                            IntegerDeserializer.class))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setProperties(properties)
+                            .build();
+
+            DataStreamSource<Integer> stream =
+                    env.fromSource(
+                            dynamicKafkaSource,
+                            WatermarkStrategy.noWatermarks(),
+                            "dynamic-kafka-src");
+
+            List<Integer> results = new ArrayList<>();
+            try (CloseableIterator<Integer> iterator = 
stream.executeAndCollect()) {
+                CommonTestUtils.waitUtil(
+                        () -> {
+                            try {
+                                results.add(iterator.next());
+                                if (cluster1ExtraEnd.get() < 0) {
+                                    cluster1ExtraEnd.set(
+                                            
DynamicKafkaSourceTestHelper.produceToKafka(
+                                                    1,
+                                                    topic,
+                                                    NUM_PARTITIONS,
+                                                    NUM_RECORDS_PER_SPLIT,
+                                                    cluster1ExtraStart));
+                                }
+                            } catch (NoSuchElementException e) {
+                                // swallow and wait
+                            } catch (Throwable e) {
+                                throw new RuntimeException(e);
+                            }
+
+                            if (cluster1ExtraEnd.get() < 0) {
+                                return false;
+                            }
+
+                            int expectedCount =
+                                    (cluster0End - cluster0Start)
+                                            + (cluster1ExtraEnd.get() - 
cluster1ExtraStart);
+                            return results.size() == expectedCount;
+                        },
+                        Duration.ofSeconds(15),
+                        "Could not obtain the required records within the 
timeout");
+            }
+
+            List<Integer> expectedResults =
+                    Stream.concat(
+                                    IntStream.range(cluster0Start, 
cluster0End).boxed(),
+                                    IntStream.range(cluster1ExtraStart, 
cluster1ExtraEnd.get())
+                                            .boxed())
+                            .collect(Collectors.toList());
+            
assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+        }
+
+        @Test
+        void testPerClusterOffsetsInitializersInBoundedMode() throws Throwable 
{
+            String topic = "test-per-cluster-offsets-initializers";
+            DynamicKafkaSourceTestHelper.createTopic(0, topic, NUM_PARTITIONS);
+            DynamicKafkaSourceTestHelper.createTopic(1, topic, NUM_PARTITIONS);
+
+            int cluster0Start = 0;
+            int cluster0End =
+                    DynamicKafkaSourceTestHelper.produceToKafka(
+                            0, topic, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT, 
cluster0Start);
+            int cluster1Start = cluster0End + 1000;
+            DynamicKafkaSourceTestHelper.produceToKafka(
+                    1, topic, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT, 
cluster1Start);
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(2);
+
+            Properties properties = new Properties();
+            
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
 "0");
+            properties.setProperty(
+                    
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "0");
+
+            Map<TopicPartition, Long> cluster1StoppingOffsets =
+                    IntStream.range(0, NUM_PARTITIONS)
+                            .boxed()
+                            .collect(
+                                    Collectors.toMap(
+                                            partition -> new 
TopicPartition(topic, partition),
+                                            partition -> 0L));
+
+            KafkaStream kafkaStream =
+                    new KafkaStream(
+                            "test-per-cluster-offsets-stream",
+                            ImmutableMap.of(
+                                    
kafkaClusterTestEnvMetadata0.getKafkaClusterId(),
+                                    new ClusterMetadata(
+                                            Collections.singleton(topic),
+                                            
kafkaClusterTestEnvMetadata0.getStandardProperties(),
+                                            OffsetsInitializer.earliest(),
+                                            OffsetsInitializer.latest()),
+                                    
kafkaClusterTestEnvMetadata1.getKafkaClusterId(),
+                                    new ClusterMetadata(
+                                            Collections.singleton(topic),
+                                            
kafkaClusterTestEnvMetadata1.getStandardProperties(),
+                                            OffsetsInitializer.earliest(),
+                                            
OffsetsInitializer.offsets(cluster1StoppingOffsets))));
+
+            MockKafkaMetadataService mockKafkaMetadataService =
+                    new 
MockKafkaMetadataService(Collections.singleton(kafkaStream));
+
+            DynamicKafkaSource<Integer> dynamicKafkaSource =
+                    DynamicKafkaSource.<Integer>builder()
+                            
.setStreamIds(Collections.singleton(kafkaStream.getStreamId()))
+                            .setKafkaMetadataService(mockKafkaMetadataService)
+                            .setDeserializer(
+                                    KafkaRecordDeserializationSchema.valueOnly(
+                                            IntegerDeserializer.class))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .setProperties(properties)
+                            .build();
+
+            DataStreamSource<Integer> stream =
+                    env.fromSource(
+                            dynamicKafkaSource,
+                            WatermarkStrategy.noWatermarks(),
+                            "dynamic-kafka-src");
+
+            List<Integer> results = new ArrayList<>();
+            try (CloseableIterator<Integer> iterator = 
stream.executeAndCollect()) {
+                while (iterator.hasNext()) {
+                    results.add(iterator.next());
+                }
+            }
+
+            assertThat(results)
+                    .containsExactlyInAnyOrderElementsOf(
+                            IntStream.range(cluster0Start, cluster0End)
+                                    .boxed()
+                                    .collect(Collectors.toList()));
+        }
+
+        @Test
+        void testRestoreFromV1EnumeratorState() throws Throwable {
+            String topic = "test-v1-enum-state-restore";
+            DynamicKafkaSourceTestHelper.createTopic(0, topic, NUM_PARTITIONS);
+            DynamicKafkaSourceTestHelper.produceToKafka(
+                    0, topic, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT, 0);
+
+            String streamId = "test-v1-enum-stream";
+            String clusterId = 
kafkaClusterTestEnvMetadata0.getKafkaClusterId();
+            Properties clusterProperties = 
kafkaClusterTestEnvMetadata0.getStandardProperties();
+            String bootstrapServers = 
kafkaClusterTestEnvMetadata0.getBrokerConnectionStrings();
+            clusterProperties.setProperty(
+                    CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+
+            byte[] serializedState =
+                    DynamicKafkaSourceEnumStateTestUtils.serializeV1State(
+                            streamId, clusterId, Collections.singleton(topic), 
bootstrapServers);
+            DynamicKafkaSourceEnumState restoredState =
+                    new DynamicKafkaSourceEnumStateSerializer().deserialize(1, 
serializedState);
+
+            Properties properties = new Properties();
+            
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
 "0");
+            properties.setProperty(
+                    
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "0");
+
+            KafkaStream kafkaStream =
+                    new KafkaStream(
+                            streamId,
+                            Collections.singletonMap(
+                                    clusterId,
+                                    new ClusterMetadata(
+                                            Collections.singleton(topic), 
clusterProperties)));
+            MockKafkaMetadataService mockKafkaMetadataService =
+                    new 
MockKafkaMetadataService(Collections.singleton(kafkaStream));
+
+            try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+                            new MockSplitEnumeratorContext<>(2);
+                    DynamicKafkaSourceEnumerator enumerator =
+                            new DynamicKafkaSourceEnumerator(
+                                    new 
KafkaStreamSetSubscriber(Collections.singleton(streamId)),
+                                    mockKafkaMetadataService,
+                                    context,
+                                    OffsetsInitializer.earliest(),
+                                    new NoStoppingOffsetsInitializer(),
+                                    properties,
+                                    Boundedness.CONTINUOUS_UNBOUNDED,
+                                    restoredState)) {
+                enumerator.start();
+                registerReader(context, enumerator, 0);
+                registerReader(context, enumerator, 1);
+                runAllOneTimeCallables(context);
+
+                List<DynamicKafkaSourceSplit> assignedSplits =
+                        context.getSplitsAssignmentSequence().stream()
+                                .map(SplitsAssignment::assignment)
+                                .flatMap(assignments -> 
assignments.values().stream())
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList());
+
+                assertThat(assignedSplits).isNotEmpty();
+                assertThat(assignedSplits)
+                        .allSatisfy(
+                                split ->
+                                        
assertThat(split.getKafkaClusterId()).isEqualTo(clusterId));
+                assertThat(assignedSplits)
+                        .allSatisfy(
+                                split ->
+                                        assertThat(
+                                                        
split.getKafkaPartitionSplit()
+                                                                
.getTopicPartition()
+                                                                .topic())
+                                                .isEqualTo(topic));
+
+                DynamicKafkaSourceEnumState snapshot = 
enumerator.snapshotState(1L);
+                ClusterMetadata snapshotMetadata =
+                        snapshot.getKafkaStreams().stream()
+                                .filter(stream -> 
stream.getStreamId().equals(streamId))
+                                .findFirst()
+                                .orElseThrow()
+                                .getClusterMetadataMap()
+                                .get(clusterId);
+                
assertThat(snapshotMetadata.getStartingOffsetsInitializer()).isNull();
+                
assertThat(snapshotMetadata.getStoppingOffsetsInitializer()).isNull();
+            }
+        }
+
         @Test
         void testMigrationUsingFileMetadataService() throws Throwable {
             // setup topics on two clusters
@@ -795,6 +1081,22 @@ public class DynamicKafkaSourceITTest extends TestLogger {
                     .collect(Collectors.toSet());
         }
 
+        private void registerReader(
+                MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context,
+                DynamicKafkaSourceEnumerator enumerator,
+                int readerId) {
+            context.registerReader(new ReaderInfo(readerId, "location " + 
readerId));
+            enumerator.addReader(readerId);
+            enumerator.handleSourceEvent(readerId, new 
GetMetadataUpdateEvent());
+        }
+
+        private void runAllOneTimeCallables(
+                MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context) 
throws Throwable {
+            while (!context.getOneTimeCallables().isEmpty()) {
+                context.runNextOneTimeCallable();
+            }
+        }
+
         private Set<KafkaStream> getKafkaStreams(
                 String kafkaClusterId, Properties properties, 
Collection<String> topics) {
             return topics.stream()
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializerTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializerTest.java
index 251309bc..1473bbdc 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializerTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializerTest.java
@@ -20,17 +20,21 @@ package 
org.apache.flink.connector.kafka.dynamic.source.enumerator;
 
 import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
 import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
+import 
org.apache.flink.connector.kafka.dynamic.source.testutils.DynamicKafkaSourceEnumStateTestUtils;
 import org.apache.flink.connector.kafka.source.enumerator.AssignmentStatus;
 import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
 import 
org.apache.flink.connector.kafka.source.enumerator.SplitAndAssignmentStatus;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.Properties;
 import java.util.Set;
 
@@ -56,6 +60,10 @@ public class DynamicKafkaSourceEnumStateSerializerTest {
         propertiesForCluster1.setProperty(
                 CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster1:9092");
 
+        OffsetsInitializer cluster0StartingOffsetsInitializer = 
OffsetsInitializer.earliest();
+        OffsetsInitializer cluster0StoppingOffsetsInitializer =
+                
OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST);
+
         Set<KafkaStream> kafkaStreams =
                 ImmutableSet.of(
                         new KafkaStream(
@@ -64,7 +72,9 @@ public class DynamicKafkaSourceEnumStateSerializerTest {
                                         "cluster0",
                                         new ClusterMetadata(
                                                 ImmutableSet.of("topic0", 
"topic1"),
-                                                propertiesForCluster0),
+                                                propertiesForCluster0,
+                                                
cluster0StartingOffsetsInitializer,
+                                                
cluster0StoppingOffsetsInitializer),
                                         "cluster1",
                                         new ClusterMetadata(
                                                 ImmutableSet.of("topic2", 
"topic3"),
@@ -98,7 +108,7 @@ public class DynamicKafkaSourceEnumStateSerializerTest {
 
         DynamicKafkaSourceEnumState dynamicKafkaSourceEnumStateAfterSerde =
                 dynamicKafkaSourceEnumStateSerializer.deserialize(
-                        1,
+                        dynamicKafkaSourceEnumStateSerializer.getVersion(),
                         dynamicKafkaSourceEnumStateSerializer.serialize(
                                 dynamicKafkaSourceEnumState));
 
@@ -107,6 +117,92 @@ public class DynamicKafkaSourceEnumStateSerializerTest {
                 .isEqualTo(dynamicKafkaSourceEnumStateAfterSerde);
     }
 
+    @Test
+    public void testSerdeWithPartialOffsetsInitializers() throws Exception {
+        DynamicKafkaSourceEnumStateSerializer 
dynamicKafkaSourceEnumStateSerializer =
+                new DynamicKafkaSourceEnumStateSerializer();
+
+        Properties propertiesForCluster0 = new Properties();
+        propertiesForCluster0.setProperty(
+                CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster0:9092");
+        Properties propertiesForCluster1 = new Properties();
+        propertiesForCluster1.setProperty(
+                CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster1:9092");
+
+        OffsetsInitializer startingOffsetsInitializer = 
OffsetsInitializer.earliest();
+        OffsetsInitializer stoppingOffsetsInitializer = 
OffsetsInitializer.latest();
+
+        Set<KafkaStream> kafkaStreams =
+                ImmutableSet.of(
+                        new KafkaStream(
+                                "stream0",
+                                ImmutableMap.of(
+                                        "cluster0",
+                                        new ClusterMetadata(
+                                                ImmutableSet.of("topic0"),
+                                                propertiesForCluster0,
+                                                startingOffsetsInitializer,
+                                                null),
+                                        "cluster1",
+                                        new ClusterMetadata(
+                                                ImmutableSet.of("topic1"),
+                                                propertiesForCluster1,
+                                                null,
+                                                stoppingOffsetsInitializer))));
+
+        DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState =
+                new DynamicKafkaSourceEnumState(kafkaStreams, 
Collections.emptyMap());
+
+        DynamicKafkaSourceEnumState dynamicKafkaSourceEnumStateAfterSerde =
+                dynamicKafkaSourceEnumStateSerializer.deserialize(
+                        dynamicKafkaSourceEnumStateSerializer.getVersion(),
+                        dynamicKafkaSourceEnumStateSerializer.serialize(
+                                dynamicKafkaSourceEnumState));
+
+        KafkaStream kafkaStream =
+                
dynamicKafkaSourceEnumStateAfterSerde.getKafkaStreams().iterator().next();
+        ClusterMetadata cluster0Metadata = 
kafkaStream.getClusterMetadataMap().get("cluster0");
+        
assertThat(cluster0Metadata.getStartingOffsetsInitializer()).isNotNull();
+        
assertThat(cluster0Metadata.getStartingOffsetsInitializer().getAutoOffsetResetStrategy())
+                .isEqualTo(OffsetResetStrategy.EARLIEST);
+        assertThat(cluster0Metadata.getStoppingOffsetsInitializer()).isNull();
+
+        ClusterMetadata cluster1Metadata = 
kafkaStream.getClusterMetadataMap().get("cluster1");
+        assertThat(cluster1Metadata.getStartingOffsetsInitializer()).isNull();
+        
assertThat(cluster1Metadata.getStoppingOffsetsInitializer()).isNotNull();
+        
assertThat(cluster1Metadata.getStoppingOffsetsInitializer().getAutoOffsetResetStrategy())
+                .isEqualTo(OffsetResetStrategy.LATEST);
+    }
+
+    @Test
+    public void testDeserializeV1State() throws Exception {
+        DynamicKafkaSourceEnumStateSerializer 
dynamicKafkaSourceEnumStateSerializer =
+                new DynamicKafkaSourceEnumStateSerializer();
+
+        byte[] serializedState =
+                DynamicKafkaSourceEnumStateTestUtils.serializeV1State(
+                        "stream0",
+                        "cluster0",
+                        ImmutableSet.of("topic0", "topic1"),
+                        "cluster0:9092");
+
+        DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState =
+                dynamicKafkaSourceEnumStateSerializer.deserialize(1, 
serializedState);
+
+        
assertThat(dynamicKafkaSourceEnumState.getClusterEnumeratorStates()).isEmpty();
+        KafkaStream kafkaStream = 
dynamicKafkaSourceEnumState.getKafkaStreams().iterator().next();
+        assertThat(kafkaStream.getStreamId()).isEqualTo("stream0");
+        ClusterMetadata clusterMetadata = 
kafkaStream.getClusterMetadataMap().get("cluster0");
+        
assertThat(clusterMetadata.getTopics()).containsExactlyInAnyOrder("topic0", 
"topic1");
+        assertThat(
+                        clusterMetadata
+                                .getProperties()
+                                
.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG))
+                .isEqualTo("cluster0:9092");
+        assertThat(clusterMetadata.getStartingOffsetsInitializer()).isNull();
+        assertThat(clusterMetadata.getStoppingOffsetsInitializer()).isNull();
+    }
+
     private static SplitAndAssignmentStatus getSplitAssignment(
             String topic, int partition, AssignmentStatus assignStatus) {
         return new SplitAndAssignmentStatus(
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/metadata/SingleClusterTopicMetadataServiceTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/metadata/SingleClusterTopicMetadataServiceTest.java
index 4e1fcf0c..81404c55 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/metadata/SingleClusterTopicMetadataServiceTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/metadata/SingleClusterTopicMetadataServiceTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
 import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
 import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
 import 
org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
 
@@ -114,4 +115,87 @@ class SingleClusterTopicMetadataServiceTest {
                 .as("the stream topic cannot be found in kafka and we rethrow")
                 
.hasRootCauseInstanceOf(UnknownTopicOrPartitionException.class);
     }
+
+    @Test
+    void describeStreamsIncludesOffsetsInitializers() throws Exception {
+        OffsetsInitializer startingOffsetsInitializer = 
OffsetsInitializer.earliest();
+        OffsetsInitializer stoppingOffsetsInitializer = 
OffsetsInitializer.latest();
+
+        KafkaMetadataService metadataService =
+                new SingleClusterTopicMetadataService(
+                        kafkaClusterTestEnvMetadata0.getKafkaClusterId(),
+                        kafkaClusterTestEnvMetadata0.getStandardProperties(),
+                        startingOffsetsInitializer,
+                        stoppingOffsetsInitializer);
+
+        try {
+            Map<String, KafkaStream> streamMap =
+                    
metadataService.describeStreams(Collections.singleton(TOPIC0));
+            ClusterMetadata clusterMetadata =
+                    streamMap
+                            .get(TOPIC0)
+                            .getClusterMetadataMap()
+                            
.get(kafkaClusterTestEnvMetadata0.getKafkaClusterId());
+
+            assertThat(clusterMetadata.getStartingOffsetsInitializer())
+                    .isSameAs(startingOffsetsInitializer);
+            assertThat(clusterMetadata.getStoppingOffsetsInitializer())
+                    .isSameAs(stoppingOffsetsInitializer);
+        } finally {
+            metadataService.close();
+        }
+    }
+
+    @Test
+    void describeStreamsAllowsNullOffsetsInitializers() throws Exception {
+        assertOffsetsInitializers(null, null);
+    }
+
+    @Test
+    void describeStreamsAllowsStartingOffsetsOnly() throws Exception {
+        assertOffsetsInitializers(OffsetsInitializer.earliest(), null);
+    }
+
+    @Test
+    void describeStreamsAllowsStoppingOffsetsOnly() throws Exception {
+        assertOffsetsInitializers(null, OffsetsInitializer.latest());
+    }
+
+    private void assertOffsetsInitializers(
+            OffsetsInitializer startingOffsetsInitializer,
+            OffsetsInitializer stoppingOffsetsInitializer)
+            throws Exception {
+        KafkaMetadataService metadataService =
+                new SingleClusterTopicMetadataService(
+                        kafkaClusterTestEnvMetadata0.getKafkaClusterId(),
+                        kafkaClusterTestEnvMetadata0.getStandardProperties(),
+                        startingOffsetsInitializer,
+                        stoppingOffsetsInitializer);
+
+        try {
+            Map<String, KafkaStream> streamMap =
+                    
metadataService.describeStreams(Collections.singleton(TOPIC0));
+            ClusterMetadata clusterMetadata =
+                    streamMap
+                            .get(TOPIC0)
+                            .getClusterMetadataMap()
+                            
.get(kafkaClusterTestEnvMetadata0.getKafkaClusterId());
+
+            if (startingOffsetsInitializer == null) {
+                
assertThat(clusterMetadata.getStartingOffsetsInitializer()).isNull();
+            } else {
+                assertThat(clusterMetadata.getStartingOffsetsInitializer())
+                        .isSameAs(startingOffsetsInitializer);
+            }
+
+            if (stoppingOffsetsInitializer == null) {
+                
assertThat(clusterMetadata.getStoppingOffsetsInitializer()).isNull();
+            } else {
+                assertThat(clusterMetadata.getStoppingOffsetsInitializer())
+                        .isSameAs(stoppingOffsetsInitializer);
+            }
+        } finally {
+            metadataService.close();
+        }
+    }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/testutils/DynamicKafkaSourceEnumStateTestUtils.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/testutils/DynamicKafkaSourceEnumStateTestUtils.java
new file mode 100644
index 00000000..f6038784
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/testutils/DynamicKafkaSourceEnumStateTestUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.dynamic.source.testutils;
+
+import 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Set;
+
+/** Test utilities for DynamicKafkaSource enum state serialization. */
+public final class DynamicKafkaSourceEnumStateTestUtils {
+    private DynamicKafkaSourceEnumStateTestUtils() {}
+
+    public static byte[] serializeV1State(
+            String streamId, String clusterId, Set<String> topics, String 
bootstrapServers)
+            throws IOException {
+        KafkaSourceEnumStateSerializer kafkaSourceEnumStateSerializer =
+                new KafkaSourceEnumStateSerializer();
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(1);
+            out.writeUTF(streamId);
+            out.writeInt(1);
+            out.writeUTF(clusterId);
+            out.writeInt(topics.size());
+            for (String topic : topics) {
+                out.writeUTF(topic);
+            }
+            out.writeUTF(bootstrapServers);
+            out.writeInt(kafkaSourceEnumStateSerializer.getVersion());
+            out.writeInt(0);
+            return baos.toByteArray();
+        }
+    }
+}

Reply via email to