This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3fe1de954bf3ee05247cdff0c43d71dc9535803d
Author: Yufei Zhang <[email protected]>
AuthorDate: Tue Dec 7 20:41:06 2021 +0800

    [FLINK-25044][pulsar][test]: improve unit test for pulsar source
---
 .../pulsar/source/PulsarSourceBuilder.java         |   2 +-
 .../reader/source/PulsarOrderedSourceReader.java   |   3 +-
 .../reader/source/PulsarUnorderedSourceReader.java |   3 +-
 .../pulsar/source/PulsarSourceBuilderTest.java     | 102 ++++-
 .../enumerator/PulsarSourceEnumeratorTest.java     | 439 +++++++++++++++++----
 .../source/PulsarOrderedSourceReaderTest.java      | 191 +++++++++
 .../reader/source/PulsarSourceReaderTestBase.java  | 245 ++++++++++++
 .../PulsarUnorderedSourceReaderTest.java}          |  20 +-
 .../PulsarOrderedPartitionSplitReaderTest.java     |  74 +++-
 .../split/PulsarPartitionSplitReaderTestBase.java  | 280 +++++++++++--
 .../PulsarUnorderedPartitionSplitReaderTest.java   |  16 +-
 .../pulsar/testutils/PulsarTestCommonUtils.java    |  72 ++++
 .../extension/SubType.java}                        |  27 +-
 .../extension/TestOrderlinessExtension.java        |  65 +++
 .../testutils/runtime/PulsarRuntimeOperator.java   |  10 +-
 15 files changed, 1373 insertions(+), 176 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index e539a0c..dd7f41e 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -426,7 +426,7 @@ public final class PulsarSourceBuilder<OUT> {
             if (rangeGenerator == null) {
                 LOG.warn(
                         "No range generator provided for key_shared 
subscription,"
-                                + " we would use the DivideRangeGenerator as 
the default range generator.");
+                                + " we would use the UniformRangeGenerator as 
the default range generator.");
                 this.rangeGenerator = new UniformRangeGenerator();
             }
         } else {
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
index 9b5c743..db62eb3 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.pulsar.source.reader.source;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
@@ -61,7 +62,7 @@ import java.util.function.Supplier;
 public class PulsarOrderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT> {
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarOrderedSourceReader.class);
 
-    private final SortedMap<Long, Map<TopicPartition, MessageId>> 
cursorsToCommit;
+    @VisibleForTesting final SortedMap<Long, Map<TopicPartition, MessageId>> 
cursorsToCommit;
     private final ConcurrentMap<TopicPartition, MessageId> 
cursorsOfFinishedSplits;
     private final AtomicReference<Throwable> cursorCommitThrowable = new 
AtomicReference<>();
     private ScheduledExecutorService cursorScheduler;
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
index 8d18b1d..ce57a00 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.pulsar.source.reader.source;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -56,7 +57,7 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarUnorderedSourceReader.class);
 
     @Nullable private final TransactionCoordinatorClient coordinatorClient;
-    private final SortedMap<Long, List<TxnID>> transactionsToCommit;
+    @VisibleForTesting final SortedMap<Long, List<TxnID>> transactionsToCommit;
     private final List<TxnID> transactionsOfFinishedSplits;
 
     public PulsarUnorderedSourceReader(
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
index 139b546..b96173d 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.connector.pulsar.source;
 
+import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.UniformRangeGenerator;
 
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit tests for {@link PulsarSourceBuilder}. */
 @SuppressWarnings("java:S5778")
@@ -32,22 +36,22 @@ class PulsarSourceBuilderTest {
     @Test
     void someSetterMethodCouldOnlyBeCalledOnce() {
         PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
-        assertThrows(
-                IllegalArgumentException.class,
-                () -> 
builder.setAdminUrl("admin-url").setAdminUrl("admin-url2"));
-        assertThrows(
-                IllegalArgumentException.class,
-                () -> 
builder.setServiceUrl("service-url").setServiceUrl("service-url2"));
-        assertThrows(
-                IllegalArgumentException.class,
-                () ->
-                        builder.setSubscriptionName("set_subscription_name")
-                                
.setSubscriptionName("set_subscription_name2"));
-        assertThrows(
-                IllegalArgumentException.class,
-                () ->
-                        builder.setSubscriptionType(SubscriptionType.Exclusive)
-                                .setSubscriptionType(SubscriptionType.Shared));
+        assertThatThrownBy(() -> 
builder.setAdminUrl("admin-url").setAdminUrl("admin-url2"))
+                .isInstanceOf(IllegalArgumentException.class);
+
+        assertThatThrownBy(() -> 
builder.setServiceUrl("service-url").setServiceUrl("service-url2"))
+                .isInstanceOf(IllegalArgumentException.class);
+
+        assertThatThrownBy(
+                        () ->
+                                
builder.setSubscriptionName("set_subscription_name")
+                                        
.setSubscriptionName("set_subscription_name2"))
+                .isInstanceOf(IllegalArgumentException.class);
+        assertThatThrownBy(
+                        () ->
+                                
builder.setSubscriptionType(SubscriptionType.Exclusive)
+                                        
.setSubscriptionType(SubscriptionType.Shared))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
@@ -55,7 +59,8 @@ class PulsarSourceBuilderTest {
         PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
         builder.setTopics("a", "b", "c");
 
-        assertThrows(IllegalStateException.class, () -> 
builder.setTopicPattern("a-a-a"));
+        assertThatThrownBy(() -> builder.setTopicPattern("a-a-a"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
     @Test
@@ -63,8 +68,63 @@ class PulsarSourceBuilderTest {
         PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
         builder.setSubscriptionType(SubscriptionType.Shared);
 
-        assertThrows(
-                IllegalArgumentException.class,
-                () -> builder.setRangeGenerator(new UniformRangeGenerator()));
+        assertThatThrownBy(() -> builder.setRangeGenerator(new 
UniformRangeGenerator()))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void missingRequiredField() {
+        PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
+        
assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class);
+        builder.setAdminUrl("admin-url");
+        
assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class);
+        builder.setServiceUrl("service-url");
+        
assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class);
+        builder.setSubscriptionName("subscription-name");
+        
assertThatThrownBy(builder::build).isInstanceOf(NullPointerException.class);
+        builder.setTopics("topic");
+        
assertThatThrownBy(builder::build).isInstanceOf(NullPointerException.class);
+        builder.setDeserializationSchema(pulsarSchema(Schema.STRING));
+        assertThatCode(builder::build).doesNotThrowAnyException();
+    }
+
+    @Test
+    void defaultBuilder() {
+        PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
+        
assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void subscriptionTypeShouldNotBeOverriddenBySetMethod() {
+        PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
+        fillRequiredFields(builder);
+
+        Configuration config = new Configuration();
+        config.set(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE, 
SubscriptionType.Shared);
+        builder.setConfig(config);
+
+        assertThatThrownBy(() -> 
builder.setSubscriptionType(SubscriptionType.Failover))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void subscriptionTypeShouldNotBeOverriddenByConfiguration() {
+        PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
+        fillRequiredFields(builder);
+
+        builder.setSubscriptionType(SubscriptionType.Failover);
+
+        Configuration config = new Configuration();
+        config.set(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE, 
SubscriptionType.Shared);
+        assertThatThrownBy(() -> builder.setConfig(config))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    private void fillRequiredFields(PulsarSourceBuilder<String> builder) {
+        builder.setAdminUrl("admin-url");
+        builder.setServiceUrl("service-url");
+        builder.setSubscriptionName("subscription-name");
+        builder.setTopics("topic");
+        builder.setDeserializationSchema(pulsarSchema(Schema.STRING));
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
index a4cc0a4..c52c514 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
@@ -18,117 +18,326 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator;
 
+import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
+import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
 import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
 
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.latest;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for {@link PulsarSourceEnumerator}. */
 class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
 
     private static final int NUM_SUBTASKS = 3;
-    private static final String DYNAMIC_TOPIC_NAME = "dynamic_topic";
-    private static final String TOPIC1 = "topic";
-    private static final String TOPIC2 = "pattern-topic";
-    private static final Set<String> PRE_EXISTING_TOPICS = 
Sets.newHashSet(TOPIC1, TOPIC2);
+    private static final int READER0 = 0;
+    private static final int READER1 = 1;
+    private static final int PARTITION_DISCOVERY_CALLABLE_INDEX = 0;
     private static final boolean ENABLE_PERIODIC_PARTITION_DISCOVERY = true;
     private static final boolean DISABLE_PERIODIC_PARTITION_DISCOVERY = false;
-    private static final boolean INCLUDE_DYNAMIC_TOPIC = true;
-    private static final boolean EXCLUDE_DYNAMIC_TOPIC = false;
 
-    // @TestInstance(TestInstance.Lifecycle.PER_CLASS) is annotated in 
PulsarTestSuitBase, so this
-    // method could be non-static.
-    @BeforeAll
-    void beforeAll() {
-        operator().setupTopic(TOPIC1);
-        operator().setupTopic(TOPIC2);
+    @ParameterizedTest
+    @EnumSource(
+            value = SubscriptionType.class,
+            names = {"Failover", "Shared"})
+    void startWithDiscoverPartitionsOnce(SubscriptionType subscriptionType) 
throws Exception {
+        Set<String> prexistingTopics = setupPreexistingTopics();
+        try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+                PulsarSourceEnumerator enumerator =
+                        createEnumerator(
+                                subscriptionType,
+                                prexistingTopics,
+                                context,
+                                DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+            // Start the enumerator and it should schedule a one time task to 
discover and assign
+            // partitions.
+            enumerator.start();
+            assertThat(context.getPeriodicCallables()).isEmpty();
+            assertThat(context.getOneTimeCallables())
+                    .as("A one time partition discovery callable should have 
been scheduled")
+                    .hasSize(1);
+        }
     }
 
-    @AfterAll
-    void afterAll() {
-        operator().deleteTopic(TOPIC1, true);
-        operator().deleteTopic(TOPIC2, true);
+    @ParameterizedTest
+    @EnumSource(
+            value = SubscriptionType.class,
+            names = {"Failover", "Shared"})
+    void startWithPeriodicPartitionDiscovery(SubscriptionType 
subscriptionType) throws Exception {
+        Set<String> prexistingTopics = setupPreexistingTopics();
+        try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+                PulsarSourceEnumerator enumerator =
+                        createEnumerator(
+                                subscriptionType,
+                                prexistingTopics,
+                                context,
+                                ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+            enumerator.start();
+            assertThat(context.getOneTimeCallables()).isEmpty();
+            assertThat((context.getPeriodicCallables()))
+                    .as("A periodic partition discovery callable should have 
been scheduled")
+                    .hasSize(1);
+        }
     }
 
-    @Test
-    void startWithDiscoverPartitionsOnce() throws Exception {
+    @ParameterizedTest
+    @EnumSource(
+            value = SubscriptionType.class,
+            names = {"Failover", "Shared"})
+    void discoverPartitionsTriggersAssignments(SubscriptionType 
subscriptionType) throws Throwable {
+        Set<String> prexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
-                        createEnumerator(context, 
DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+                        createEnumerator(
+                                subscriptionType,
+                                prexistingTopics,
+                                context,
+                                DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
-            // Start the enumerator and it should schedule a one time task to 
discover and assign
-            // partitions.
             enumerator.start();
-            assertTrue(context.getPeriodicCallables().isEmpty());
-            assertEquals(
-                    1,
-                    context.getOneTimeCallables().size(),
-                    "A one time partition discovery callable should have been 
scheduled");
+
+            // register reader 0, 1
+            registerReader(context, enumerator, READER0);
+            registerReader(context, enumerator, READER1);
+            assertThat(context.getSplitsAssignmentSequence()).isEmpty();
+
+            // Run the partition discover callable and check the partition 
assignment.
+            runOneTimePartitionDiscovery(context);
+            verifyLastReadersAssignments(subscriptionType, context, 
prexistingTopics, 1);
         }
     }
 
-    @Test
-    void startWithPeriodicPartitionDiscovery() throws Exception {
+    @ParameterizedTest
+    @EnumSource(
+            value = SubscriptionType.class,
+            names = {"Failover", "Shared"})
+    void discoverPartitionsPeriodically(SubscriptionType subscriptionType) 
throws Throwable {
+        String dynamicTopic = randomAlphabetic(10);
+        Set<String> prexistingTopics = setupPreexistingTopics();
+        Set<String> topicsToSubscribe = new HashSet<>(prexistingTopics);
+        topicsToSubscribe.add(dynamicTopic);
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
-                        createEnumerator(context, 
ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+                        createEnumerator(
+                                subscriptionType,
+                                topicsToSubscribe,
+                                context,
+                                ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
-            // Start the enumerator and it should schedule a one time task to 
discover and assign
-            // partitions.
+            testRegisterReadersForPreexistingTopics(
+                    subscriptionType, prexistingTopics, context, enumerator);
+
+            // invoke partition discovery callable again and there should be 
no new assignments.
+            runPeriodicPartitionDiscovery(context);
+
+            int expectedSplitsAssignmentSequenceSize =
+                    subscriptionType == SubscriptionType.Failover ? 1 : 2;
+
+            assertThat(context.getSplitsAssignmentSequence())
+                    .as("No new assignments should be made because there is no 
partition change")
+                    .hasSize(expectedSplitsAssignmentSequenceSize);
+
+            // create the dynamic topic.
+            operator().createTopic(dynamicTopic, 
PulsarRuntimeOperator.DEFAULT_PARTITIONS);
+
+            // invoke partition discovery callable again.
+            while (true) {
+                runPeriodicPartitionDiscovery(context);
+                if (context.getSplitsAssignmentSequence().size() < 2) {
+                    sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+                } else {
+                    break;
+                }
+            }
+            verifyLastReadersAssignments(
+                    subscriptionType,
+                    context,
+                    Collections.singleton(dynamicTopic),
+                    expectedSplitsAssignmentSequenceSize + 1);
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = SubscriptionType.class,
+            names = {"Failover", "Shared"})
+    void addSplitsBack(SubscriptionType subscriptionType) throws Throwable {
+        Set<String> prexistingTopics = setupPreexistingTopics();
+        try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+                PulsarSourceEnumerator enumerator =
+                        createEnumerator(
+                                subscriptionType,
+                                prexistingTopics,
+                                context,
+                                ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+            testRegisterReadersForPreexistingTopics(
+                    subscriptionType, prexistingTopics, context, enumerator);
+
+            // Simulate a reader failure.
+            context.unregisterReader(READER0);
+            enumerator.addSplitsBack(
+                    
context.getSplitsAssignmentSequence().get(0).assignment().get(READER0),
+                    READER0);
+            int expectedSplitsAssignmentSequenceSize =
+                    subscriptionType == SubscriptionType.Failover ? 1 : 2;
+            assertThat(context.getSplitsAssignmentSequence())
+                    .as("The added back splits should have not been assigned")
+                    .hasSize(expectedSplitsAssignmentSequenceSize);
+
+            // Simulate a reader recovery.
+            registerReader(context, enumerator, READER0);
+            verifyLastReadersAssignments(
+                    subscriptionType,
+                    context,
+                    prexistingTopics,
+                    expectedSplitsAssignmentSequenceSize + 1);
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = SubscriptionType.class,
+            names = {"Failover"})
+    void workWithPreexistingAssignments(SubscriptionType subscriptionType) 
throws Throwable {
+        Set<String> prexistingTopics = setupPreexistingTopics();
+        PulsarSourceEnumState preexistingAssignments;
+        try (MockSplitEnumeratorContext<PulsarPartitionSplit> context1 =
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+                PulsarSourceEnumerator enumerator =
+                        createEnumerator(
+                                subscriptionType,
+                                prexistingTopics,
+                                context1,
+                                ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+            testRegisterReadersForPreexistingTopics(
+                    subscriptionType, prexistingTopics, context1, enumerator);
+            preexistingAssignments =
+                    
asEnumState(context1.getSplitsAssignmentSequence().get(0).assignment());
+        }
+
+        try (MockSplitEnumeratorContext<PulsarPartitionSplit> context2 =
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+                PulsarSourceEnumerator enumerator =
+                        createEnumerator(
+                                subscriptionType,
+                                prexistingTopics,
+                                context2,
+                                ENABLE_PERIODIC_PARTITION_DISCOVERY,
+                                preexistingAssignments)) {
             enumerator.start();
-            assertTrue(context.getOneTimeCallables().isEmpty());
-            assertEquals(
-                    1,
-                    context.getPeriodicCallables().size(),
-                    "A periodic partition discovery callable should have been 
scheduled");
+            runPeriodicPartitionDiscovery(context2);
+
+            registerReader(context2, enumerator, READER0);
+            verifyLastReadersAssignments(subscriptionType, context2, 
prexistingTopics, 1);
         }
     }
 
-    private PulsarSourceEnumerator createEnumerator(
+    @ParameterizedTest
+    @EnumSource(
+            value = SubscriptionType.class,
+            names = {"Failover", "Shared"})
+    void snapshotState(SubscriptionType subscriptionType) throws Throwable {
+        Set<String> prexistingTopics = setupPreexistingTopics();
+        try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+                PulsarSourceEnumerator enumerator =
+                        createEnumerator(subscriptionType, prexistingTopics, 
context, false)) {
+            enumerator.start();
+
+            // No reader is registered, so the state should be empty
+            final PulsarSourceEnumState state1 = enumerator.snapshotState(1L);
+            assertThat(state1.getAppendedPartitions()).isEmpty();
+
+            registerReader(context, enumerator, READER0);
+            registerReader(context, enumerator, READER1);
+            runOneTimePartitionDiscovery(context);
+
+            // The state should contain splits assigned to READER0 and READER1
+            final PulsarSourceEnumState state2 = enumerator.snapshotState(1L);
+            verifySplitAssignmentWithPartitions(
+                    getExpectedTopicPartitions(prexistingTopics), 
state2.getAppendedPartitions());
+        }
+    }
+
+    private Set<String> setupPreexistingTopics() {
+        String topic1 = randomAlphabetic(10);
+        String topic2 = randomAlphabetic(10);
+        operator().setupTopic(topic1);
+        operator().setupTopic(topic2);
+        Set<String> preexistingTopics = new HashSet<>();
+        preexistingTopics.add(topic1);
+        preexistingTopics.add(topic2);
+        return preexistingTopics;
+    }
+
+    private void testRegisterReadersForPreexistingTopics(
+            SubscriptionType subscriptionType,
+            Set<String> topics,
             MockSplitEnumeratorContext<PulsarPartitionSplit> context,
-            boolean enablePeriodicPartitionDiscovery) {
-        return createEnumerator(context, enablePeriodicPartitionDiscovery, 
EXCLUDE_DYNAMIC_TOPIC);
+            PulsarSourceEnumerator enumerator)
+            throws Throwable {
+        enumerator.start();
+
+        // register reader 0 before the partition discovery.
+        registerReader(context, enumerator, READER0);
+        assertThat(context.getSplitsAssignmentSequence()).isEmpty();
+
+        // Run the partition discover callable and check the partition 
assignment.
+        runPeriodicPartitionDiscovery(context);
+        verifyLastReadersAssignments(subscriptionType, context, topics, 1);
+
+        registerReader(context, enumerator, READER1);
+
+        int expectedSplitsAssignmentSequenceSize =
+                subscriptionType == SubscriptionType.Failover ? 1 : 2;
+        verifyLastReadersAssignments(
+                subscriptionType, context, topics, 
expectedSplitsAssignmentSequenceSize);
     }
 
     private PulsarSourceEnumerator createEnumerator(
+            SubscriptionType subscriptionType,
+            Set<String> topics,
             MockSplitEnumeratorContext<PulsarPartitionSplit> enumContext,
-            boolean enablePeriodicPartitionDiscovery,
-            boolean includeDynamicTopic) {
-        List<String> topics = new ArrayList<>(PRE_EXISTING_TOPICS);
-        if (includeDynamicTopic) {
-            topics.add(DYNAMIC_TOPIC_NAME);
-        }
-        Configuration configuration = operator().config();
-        configuration.set(PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Failover);
-
+            boolean enablePeriodicPartitionDiscovery) {
         PulsarSourceEnumState sourceEnumState =
                 new PulsarSourceEnumState(
                         Sets.newHashSet(),
@@ -136,40 +345,38 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
                         Maps.newHashMap(),
                         Maps.newHashMap(),
                         false);
-
         return createEnumerator(
+                subscriptionType,
+                topics,
                 enumContext,
                 enablePeriodicPartitionDiscovery,
-                topics,
-                sourceEnumState,
-                configuration);
+                sourceEnumState);
     }
 
-    /**
-     * Create the enumerator. For the purpose of the tests in this class we 
don't care about the
-     * subscriber and offsets initializer, so just use arbitrary settings.
-     */
     private PulsarSourceEnumerator createEnumerator(
+            SubscriptionType subscriptionType,
+            Set<String> topicsToSubscribe,
             MockSplitEnumeratorContext<PulsarPartitionSplit> enumContext,
             boolean enablePeriodicPartitionDiscovery,
-            Collection<String> topicsToSubscribe,
-            PulsarSourceEnumState sourceEnumState,
-            Configuration configuration) {
+            PulsarSourceEnumState sourceEnumState) {
         // Use a TopicPatternSubscriber so that no exception if a subscribed 
topic hasn't been
         // created yet.
         String topicRegex = String.join("|", topicsToSubscribe);
         Pattern topicPattern = Pattern.compile(topicRegex);
         PulsarSubscriber subscriber =
                 getTopicPatternSubscriber(topicPattern, 
RegexSubscriptionMode.AllTopics);
+
+        Configuration configuration = operator().config();
+        configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
         if (enablePeriodicPartitionDiscovery) {
             configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 60L);
         } else {
             configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L);
         }
         SourceConfiguration sourceConfiguration = new 
SourceConfiguration(configuration);
+
         SplitsAssignmentState assignmentState =
                 new SplitsAssignmentState(latest(), sourceConfiguration, 
sourceEnumState);
-
         return new PulsarSourceEnumerator(
                 subscriber,
                 StartCursor.earliest(),
@@ -179,4 +386,102 @@ class PulsarSourceEnumeratorTest extends 
PulsarTestSuiteBase {
                 enumContext,
                 assignmentState);
     }
+
+    private void registerReader(
+            MockSplitEnumeratorContext<PulsarPartitionSplit> context,
+            PulsarSourceEnumerator enumerator,
+            int reader) {
+        context.registerReader(new ReaderInfo(reader, "testing location "));
+        enumerator.addReader(reader);
+    }
+
+    private void verifyLastReadersAssignments(
+            SubscriptionType subscriptionType,
+            MockSplitEnumeratorContext<PulsarPartitionSplit> context,
+            Set<String> topics,
+            int expectedAssignmentSeqSize) {
+        
assertThat(context.getSplitsAssignmentSequence()).hasSize(expectedAssignmentSeqSize);
+        verifyAssignments(
+                subscriptionType,
+                getExpectedTopicPartitions(topics),
+                context.getSplitsAssignmentSequence()
+                        .get(expectedAssignmentSeqSize - 1)
+                        .assignment());
+    }
+
+    private void verifyAssignments(
+            SubscriptionType subscriptionType,
+            Set<TopicPartition> expectedTopicPartitions,
+            Map<Integer, List<PulsarPartitionSplit>> actualAssignments) {
+        if (subscriptionType == SubscriptionType.Failover) {
+            int actualSize = 
actualAssignments.values().stream().mapToInt(List::size).sum();
+            assertThat(actualSize).isEqualTo(expectedTopicPartitions.size());
+        } else if (subscriptionType == SubscriptionType.Shared) {
+            actualAssignments
+                    .values()
+                    .forEach(
+                            (splits) -> 
assertThat(splits).hasSize(expectedTopicPartitions.size()));
+        }
+    }
+
+    private Set<TopicPartition> getExpectedTopicPartitions(Set<String> topics) 
{
+        Set<TopicPartition> allPartitions = new HashSet<>();
+        for (String topicName : topics) {
+            for (int i = 0; i < PulsarRuntimeOperator.DEFAULT_PARTITIONS; i++) 
{
+                allPartitions.add(new TopicPartition(topicName, i, 
TopicRange.createFullRange()));
+            }
+        }
+        return allPartitions;
+    }
+
+    private void verifySplitAssignmentWithPartitions(
+            Set<TopicPartition> expectedAssignment, Set<TopicPartition> 
actualTopicPartitions) {
+        assertThat(actualTopicPartitions).isEqualTo(expectedAssignment);
+    }
+
+    // this method only works for non Shared Mode
+    private PulsarSourceEnumState asEnumState(
+            Map<Integer, List<PulsarPartitionSplit>> assignments) {
+        Set<TopicPartition> appendedPartitions = new HashSet<>();
+        Set<PulsarPartitionSplit> pendingPartitionSplits = new HashSet<>();
+        Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits = 
new HashMap<>();
+        Map<Integer, Set<String>> readerAssignedSplits = new HashMap<>();
+        boolean initialized = false;
+
+        assignments
+                .values()
+                .forEach(
+                        splits -> {
+                            appendedPartitions.addAll(
+                                    splits.stream()
+                                            
.map(PulsarPartitionSplit::getPartition)
+                                            .collect(Collectors.toList()));
+                            pendingPartitionSplits.addAll(splits);
+                        });
+
+        return new PulsarSourceEnumState(
+                appendedPartitions,
+                pendingPartitionSplits,
+                sharedPendingPartitionSplits,
+                readerAssignedSplits,
+                initialized);
+    }
+
+    private void runOneTimePartitionDiscovery(
+            MockSplitEnumeratorContext<PulsarPartitionSplit> context) throws 
Throwable {
+        // Fetch potential topic descriptions
+        context.runNextOneTimeCallable();
+        if (!context.getOneTimeCallables().isEmpty()) {
+            context.runNextOneTimeCallable();
+        }
+    }
+
+    private void runPeriodicPartitionDiscovery(
+            MockSplitEnumeratorContext<PulsarPartitionSplit> context) throws 
Throwable {
+        // Fetch potential topic descriptions
+        context.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);
+        if (!context.getOneTimeCallables().isEmpty()) {
+            context.runNextOneTimeCallable();
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
new file mode 100644
index 0000000..477dfdd
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.connector.pulsar.testutils.extension.SubType;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplit;
+import static 
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplits;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase {
+
+    private static final int MAX_EMPTY_POLLING_TIMES = 10;
+
+    @SubType SubscriptionType subscriptionType = SubscriptionType.Failover;
+
+    @TestTemplate
+    void consumeMessagesAndCommitOffsets(
+            PulsarSourceReaderBase<Integer> baseReader, Boundedness 
boundedness, String topicName)
+            throws Exception {
+        // set up the partition
+        PulsarOrderedSourceReader<Integer> reader = 
(PulsarOrderedSourceReader<Integer>) baseReader;
+        setupSourceReader(reader, topicName, 0, 
Boundedness.CONTINUOUS_UNBOUNDED);
+
+        // waiting for results
+        TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+        pollUntil(
+                reader,
+                output,
+                () -> output.getEmittedRecords().size() == 
NUM_RECORDS_PER_PARTITION,
+                "The output didn't poll enough records before timeout.");
+        reader.snapshotState(100L);
+        reader.notifyCheckpointComplete(100L);
+        pollUntil(
+                reader,
+                output,
+                reader.cursorsToCommit::isEmpty,
+                "The offset commit did not finish before timeout.");
+
+        // verify consumption
+        reader.close();
+        verifyAllMessageAcknowledged(
+                NUM_RECORDS_PER_PARTITION, 
TopicNameUtils.topicNameWithPartition(topicName, 0));
+    }
+
+    @TestTemplate
+    void offsetCommitOnCheckpointComplete(
+            PulsarSourceReaderBase<Integer> baseReader, Boundedness 
boundedness, String topicName)
+            throws Exception {
+        PulsarOrderedSourceReader<Integer> reader = 
(PulsarOrderedSourceReader<Integer>) baseReader;
+        // consume more than 1 partition
+        reader.addSplits(
+                createPartitionSplits(
+                        topicName, DEFAULT_PARTITIONS, 
Boundedness.CONTINUOUS_UNBOUNDED));
+        reader.notifyNoMoreSplits();
+        TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+        long checkpointId = 0;
+        int emptyResultTime = 0;
+        InputStatus status;
+        do {
+            checkpointId++;
+            status = reader.pollNext(output);
+            // Create a checkpoint for each message consumption, but not 
complete them.
+            reader.snapshotState(checkpointId);
+            // the first couple of pollNext() might return NOTHING_AVAILABLE 
before data appears
+            if (InputStatus.NOTHING_AVAILABLE == status) {
+                emptyResultTime++;
+                sleepUninterruptibly(1, TimeUnit.SECONDS);
+            }
+
+        } while (emptyResultTime < MAX_EMPTY_POLLING_TIMES
+                && status != InputStatus.END_OF_INPUT
+                && output.getEmittedRecords().size()
+                        < NUM_RECORDS_PER_PARTITION * DEFAULT_PARTITIONS);
+
+        // The completion of the last checkpoint should subsume all previous 
checkpoints.
+        assertThat(reader.cursorsToCommit).hasSize((int) checkpointId);
+        long lastCheckpointId = checkpointId;
+        // notify checkpoint complete and expect all cursors committed
+        assertThatCode(() -> reader.notifyCheckpointComplete(lastCheckpointId))
+                .doesNotThrowAnyException();
+        assertThat(reader.cursorsToCommit).isEmpty();
+
+        // Verify the committed offsets.
+        reader.close();
+        for (int i = 0; i < DEFAULT_PARTITIONS; i++) {
+            verifyAllMessageAcknowledged(
+                    NUM_RECORDS_PER_PARTITION, 
TopicNameUtils.topicNameWithPartition(topicName, i));
+        }
+    }
+
+    private void setupSourceReader(
+            PulsarSourceReaderBase<Integer> reader,
+            String topicName,
+            int partitionId,
+            Boundedness boundedness) {
+        PulsarPartitionSplit split = createPartitionSplit(topicName, 
partitionId, boundedness);
+        reader.addSplits(Collections.singletonList(split));
+        reader.notifyNoMoreSplits();
+    }
+
+    private void pollUntilReadExpectedNumberOfRecordsAndValidate(
+            PulsarSourceReaderBase<Integer> reader,
+            TestingReaderOutput<Integer> output,
+            int expectedRecords,
+            String topicNameWithPartition)
+            throws Exception {
+        pollUntil(
+                reader,
+                output,
+                () -> output.getEmittedRecords().size() == expectedRecords,
+                "The output didn't poll enough records before timeout.");
+        reader.close();
+        verifyAllMessageAcknowledged(expectedRecords, topicNameWithPartition);
+        assertThat(output.getEmittedRecords()).hasSize(expectedRecords);
+    }
+
+    private void pollUntil(
+            PulsarSourceReaderBase<Integer> reader,
+            ReaderOutput<Integer> output,
+            Supplier<Boolean> condition,
+            String errorMessage)
+            throws InterruptedException, TimeoutException {
+        CommonTestUtils.waitUtil(
+                () -> {
+                    try {
+                        reader.pollNext(output);
+                    } catch (Exception exception) {
+                        throw new RuntimeException(
+                                "Caught unexpected exception when polling from 
the reader",
+                                exception);
+                    }
+                    return condition.get();
+                },
+                Duration.ofSeconds(Integer.MAX_VALUE),
+                errorMessage);
+    }
+
+    private void verifyAllMessageAcknowledged(int expectedMessages, String 
partitionName)
+            throws PulsarAdminException {
+        TopicStats topicStats = 
operator().admin().topics().getStats(partitionName, true, true);
+        // verify if the messages has been consumed
+        Map<String, ? extends SubscriptionStats> subscriptionStats = 
topicStats.getSubscriptions();
+        assertThat(subscriptionStats).hasSizeGreaterThan(0);
+        subscriptionStats.forEach(
+                (subscription, stats) -> {
+                    assertThat(stats.getUnackedMessages()).isZero();
+                    
assertThat(stats.getMsgOutCounter()).isEqualTo(expectedMessages);
+                });
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
new file mode 100644
index 0000000..339027c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import 
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
+import 
org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension;
+import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.core.io.InputStatus;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
+import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static 
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplit;
+import static 
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.isAssignableFromParameterContext;
+import static 
org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY;
+import static 
org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_TEST_RESOURCE_NAMESPACE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+@ExtendWith({
+    TestOrderlinessExtension.class,
+    TestLoggerExtension.class,
+})
+abstract class PulsarSourceReaderTestBase extends PulsarTestSuiteBase {
+
+    @RegisterExtension
+    PulsarSourceReaderInvocationContextProvider provider =
+            new PulsarSourceReaderInvocationContextProvider();
+
+    @BeforeEach
+    void beforeEach(String topicName) {
+        Random random = new Random(System.currentTimeMillis());
+        operator().setupTopic(topicName, Schema.INT32, () -> 
random.nextInt(20));
+    }
+
+    @AfterEach
+    void afterEach(String topicName) {
+        operator().deleteTopic(topicName, true);
+    }
+
+    @TestTemplate
+    void assignZeroSplitsCreatesZeroSubscription(
+            PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, 
String topicName)
+            throws Exception {
+        reader.snapshotState(100L);
+        reader.notifyCheckpointComplete(100L);
+        // Verify the committed offsets.
+        reader.close();
+        for (int i = 0; i < PulsarRuntimeOperator.DEFAULT_PARTITIONS; i++) {
+            
verifyNoSubscriptionCreated(TopicNameUtils.topicNameWithPartition(topicName, 
i));
+        }
+    }
+
+    @TestTemplate
+    void assigningEmptySplits(
+            PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, 
String topicName)
+            throws Exception {
+        final PulsarPartitionSplit emptySplit =
+                createPartitionSplit(
+                        topicName, 0, Boundedness.CONTINUOUS_UNBOUNDED, 
MessageId.latest);
+
+        reader.addSplits(Collections.singletonList(emptySplit));
+
+        TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+        InputStatus status = reader.pollNext(output);
+        assertThat(status).isEqualTo(InputStatus.NOTHING_AVAILABLE);
+        reader.close();
+    }
+
+    private void verifyNoSubscriptionCreated(String partitionName) throws 
PulsarAdminException {
+        Map<String, ? extends SubscriptionStats> subscriptionStats =
+                operator().admin().topics().getStats(partitionName, true, 
true).getSubscriptions();
+        assertThat(subscriptionStats).isEmpty();
+    }
+
+    private PulsarSourceReaderBase<Integer> sourceReader(
+            boolean autoAcknowledgementEnabled, SubscriptionType 
subscriptionType) {
+        Configuration configuration = operator().config();
+        configuration.set(PULSAR_MAX_FETCH_RECORDS, 1);
+        configuration.set(PULSAR_MAX_FETCH_TIME, 1000L);
+        configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
+        configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
+        if (autoAcknowledgementEnabled
+                || configuration.get(PULSAR_SUBSCRIPTION_TYPE) == 
SubscriptionType.Shared) {
+            configuration.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
+        }
+        PulsarDeserializationSchema<Integer> deserializationSchema = 
pulsarSchema(Schema.INT32);
+        SourceReaderContext context = new TestingReaderContext();
+        try {
+            deserializationSchema.open(
+                    new 
PulsarDeserializationSchemaInitializationContext(context));
+        } catch (Exception e) {
+            fail("Error while opening deserializationSchema");
+        }
+
+        SourceConfiguration sourceConfiguration = new 
SourceConfiguration(configuration);
+        return (PulsarSourceReaderBase<Integer>)
+                PulsarSourceReaderFactory.create(
+                        context, deserializationSchema, configuration, 
sourceConfiguration);
+    }
+
+    public class PulsarSourceReaderInvocationContextProvider
+            implements TestTemplateInvocationContextProvider {
+
+        @Override
+        public boolean supportsTestTemplate(ExtensionContext context) {
+            return true;
+        }
+
+        @Override
+        public Stream<TestTemplateInvocationContext> 
provideTestTemplateInvocationContexts(
+                ExtensionContext context) {
+            SubscriptionType subscriptionType =
+                    (SubscriptionType)
+                            context.getStore(PULSAR_TEST_RESOURCE_NAMESPACE)
+                                    
.get(PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY);
+            return Stream.of(
+                    new PulsarSourceReaderInvocationContext(
+                            sourceReader(true, subscriptionType), 
Boundedness.CONTINUOUS_UNBOUNDED),
+                    new PulsarSourceReaderInvocationContext(
+                            sourceReader(false, subscriptionType),
+                            Boundedness.CONTINUOUS_UNBOUNDED));
+        }
+    }
+
+    public static class PulsarSourceReaderInvocationContext
+            implements TestTemplateInvocationContext {
+
+        private final PulsarSourceReaderBase<?> sourceReader;
+        private final Boundedness boundedness;
+        private final String randomTopicName;
+
+        public PulsarSourceReaderInvocationContext(
+                PulsarSourceReaderBase<?> splitReader, Boundedness 
boundedness) {
+            this.sourceReader = checkNotNull(splitReader);
+            this.boundedness = checkNotNull(boundedness);
+            this.randomTopicName = randomAlphabetic(5);
+        }
+
+        @Override
+        public String getDisplayName(int invocationIndex) {
+            return "AutoAckEnabled: "
+                    + 
sourceReader.sourceConfiguration.isEnableAutoAcknowledgeMessage()
+                    + "  Boundedness: "
+                    + boundedness.toString();
+        }
+
+        @Override
+        public List<Extension> getAdditionalExtensions() {
+            return Arrays.asList(
+                    new ParameterResolver() {
+                        @Override
+                        public boolean supportsParameter(
+                                ParameterContext parameterContext,
+                                ExtensionContext extensionContext)
+                                throws ParameterResolutionException {
+                            return isAssignableFromParameterContext(
+                                            PulsarSourceReaderBase.class, 
parameterContext)
+                                    || isAssignableFromParameterContext(
+                                            Boundedness.class, 
parameterContext)
+                                    || isAssignableFromParameterContext(
+                                            String.class, parameterContext);
+                        }
+
+                        @Override
+                        public Object resolveParameter(
+                                ParameterContext parameterContext,
+                                ExtensionContext extensionContext)
+                                throws ParameterResolutionException {
+                            if (parameterContext
+                                    .getParameter()
+                                    .getType()
+                                    .equals(PulsarSourceReaderBase.class)) {
+                                return sourceReader;
+                            } else if (parameterContext
+                                    .getParameter()
+                                    .getType()
+                                    .equals(Boundedness.class)) {
+                                return boundedness;
+                            } else {
+                                return randomTopicName;
+                            }
+                        }
+                    });
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java
similarity index 52%
copy from 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
copy to 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java
index a94504a..4f7fdd3 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java
@@ -16,22 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.source.reader.split;
+package org.apache.flink.connector.pulsar.source.reader.source;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.pulsar.testutils.extension.SubType;
 
-import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
+import org.apache.pulsar.client.api.SubscriptionType;
 
-/** Unit tests for {@link PulsarOrderedPartitionSplitReaderTest}. */
-class PulsarOrderedPartitionSplitReaderTest extends 
PulsarPartitionSplitReaderTestBase {
-
-    @Override
-    protected PulsarPartitionSplitReaderBase<String> splitReader() {
-        return new PulsarOrderedPartitionSplitReader<>(
-                operator().client(),
-                operator().admin(),
-                readerConfig(),
-                sourceConfig(),
-                flinkSchema(new SimpleStringSchema()));
-    }
+class PulsarUnorderedSourceReaderTest extends PulsarSourceReaderTestBase {
+    @SubType SubscriptionType subscriptionType = SubscriptionType.Shared;
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
index a94504a..3d58d5e 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
@@ -18,20 +18,74 @@
 
 package org.apache.flink.connector.pulsar.source.reader.split;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.pulsar.testutils.extension.SubType;
 
-import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.junit.jupiter.api.TestTemplate;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
+import static org.apache.pulsar.client.api.Schema.STRING;
 
 /** Unit tests for {@link PulsarOrderedPartitionSplitReaderTest}. */
 class PulsarOrderedPartitionSplitReaderTest extends 
PulsarPartitionSplitReaderTestBase {
 
-    @Override
-    protected PulsarPartitionSplitReaderBase<String> splitReader() {
-        return new PulsarOrderedPartitionSplitReader<>(
-                operator().client(),
-                operator().admin(),
-                readerConfig(),
-                sourceConfig(),
-                flinkSchema(new SimpleStringSchema()));
+    @SubType SubscriptionType subscriptionType = SubscriptionType.Failover;
+
+    @TestTemplate
+    void consumeMessageCreatedBeforeHandleSplitsChangesWithoutSeek(
+            PulsarPartitionSplitReaderBase<String> splitReader) {
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        handleSplit(splitReader, topicName, 0);
+        fetchedMessages(splitReader, 0, true);
+    }
+
+    @TestTemplate
+    void 
consumeMessageCreatedBeforeHandleSplitsChangesAndUseLatestStartCursorWithoutSeek(
+            PulsarPartitionSplitReaderBase<String> splitReader) {
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        handleSplit(splitReader, topicName, 0, MessageId.latest);
+        fetchedMessages(splitReader, 0, true);
+    }
+
+    @TestTemplate
+    void 
consumeMessageCreatedBeforeHandleSplitsChangesAndUseEarliestStartCursorWithoutSeek(
+            PulsarPartitionSplitReaderBase<String> splitReader) {
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        handleSplit(splitReader, topicName, 0, MessageId.earliest);
+        fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
+    }
+
+    @TestTemplate
+    void 
consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithoutSeek(
+            PulsarPartitionSplitReaderBase<String> splitReader) {
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        MessageIdImpl lastMessageId =
+                (MessageIdImpl)
+                        sneakyAdmin(
+                                () ->
+                                        operator()
+                                                .admin()
+                                                .topics()
+                                                .getLastMessageId(
+                                                        
topicNameWithPartition(topicName, 0)));
+        // when recover, use exclusive startCursor
+        handleSplit(
+                splitReader,
+                topicName,
+                0,
+                new MessageIdImpl(
+                        lastMessageId.getLedgerId(),
+                        lastMessageId.getEntryId() - 1,
+                        lastMessageId.getPartitionIndex()));
+        fetchedMessages(splitReader, 1, true);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
index fb9fa01..0c0c0ba 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
@@ -18,17 +18,28 @@
 
 package org.apache.flink.connector.pulsar.source.reader.split;
 
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
+import 
org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
 
-import org.junit.jupiter.api.DisplayName;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.Extension;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.ParameterContext;
@@ -39,78 +50,164 @@ import 
org.junit.jupiter.api.extension.TestTemplateInvocationContext;
 import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
 import static java.time.Duration.ofSeconds;
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyThrow;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.never;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
+import static 
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.isAssignableFromParameterContext;
+import static 
org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY;
+import static 
org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_TEST_RESOURCE_NAMESPACE;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.pulsar.client.api.Schema.STRING;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test utils for split readers. */
+@ExtendWith({
+    TestOrderlinessExtension.class,
+    TestLoggerExtension.class,
+})
 public abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
 
     @RegisterExtension
     PulsarSplitReaderInvocationContextProvider provider =
             new PulsarSplitReaderInvocationContextProvider();
 
-    protected Configuration readerConfig() {
+    /** Default reader config: max message 1, fetch timeout 1s. */
+    private Configuration readerConfig() {
         Configuration config = operator().config();
         config.set(PULSAR_MAX_FETCH_RECORDS, 1);
         config.set(PULSAR_MAX_FETCH_TIME, 1000L);
         config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
-
         return config;
     }
 
-    protected SourceConfiguration sourceConfig() {
+    private SourceConfiguration sourceConfig() {
         return new SourceConfiguration(readerConfig());
     }
 
-    protected SplitsAddition<PulsarPartitionSplit> createSplit(String 
topicName, int partitionId) {
+    protected void handleSplit(
+            PulsarPartitionSplitReaderBase<String> reader, String topicName, 
int partitionId) {
+        handleSplit(reader, topicName, partitionId, null);
+    }
+
+    protected void handleSplit(
+            PulsarPartitionSplitReaderBase<String> reader,
+            String topicName,
+            int partitionId,
+            MessageId startPosition) {
+        TopicPartition partition = new TopicPartition(topicName, partitionId, 
createFullRange());
+        PulsarPartitionSplit split =
+                new PulsarPartitionSplit(partition, StopCursor.never(), 
startPosition, null);
+        SplitsAddition<PulsarPartitionSplit> addition = new 
SplitsAddition<>(singletonList(split));
+        reader.handleSplitsChanges(addition);
+    }
+
+    private void seekStartPositionAndHandleSplit(
+            PulsarPartitionSplitReaderBase<String> reader, String topicName, 
int partitionId) {
+        seekStartPositionAndHandleSplit(reader, topicName, partitionId, 
MessageId.latest);
+    }
+
+    private void seekStartPositionAndHandleSplit(
+            PulsarPartitionSplitReaderBase<String> reader,
+            String topicName,
+            int partitionId,
+            MessageId startPosition) {
         TopicPartition partition = new TopicPartition(topicName, partitionId, 
createFullRange());
-        PulsarPartitionSplit split = new PulsarPartitionSplit(partition, 
never());
-        return new SplitsAddition<>(singletonList(split));
+        PulsarPartitionSplit split =
+                new PulsarPartitionSplit(partition, StopCursor.never(), null, 
null);
+        SplitsAddition<PulsarPartitionSplit> addition = new 
SplitsAddition<>(singletonList(split));
+
+        // create consumer and seek before split changes
+        try (Consumer<byte[]> consumer = 
reader.createPulsarConsumer(partition)) {
+            // inclusive messageId
+            StartCursor startCursor = StartCursor.fromMessageId(startPosition);
+            startCursor.seekPosition(partition.getTopic(), 
partition.getPartitionId(), consumer);
+        } catch (PulsarClientException e) {
+            sneakyThrow(e);
+        }
+
+        reader.handleSplitsChanges(addition);
+    }
+
+    private <T> PulsarMessage<T> 
fetchedMessage(PulsarPartitionSplitReaderBase<T> splitReader) {
+        return fetchedMessages(splitReader, 1, 
false).stream().findFirst().orElse(null);
+    }
+
+    protected <T> List<PulsarMessage<T>> fetchedMessages(
+            PulsarPartitionSplitReaderBase<T> splitReader, int expectedCount, 
boolean verify) {
+        return fetchedMessages(
+                splitReader, expectedCount, verify, 
Boundedness.CONTINUOUS_UNBOUNDED);
     }
 
-    protected <T> PulsarMessage<T> 
fetchedMessage(PulsarPartitionSplitReaderBase<T> splitReader) {
-        try {
-            RecordsWithSplitIds<PulsarMessage<T>> records = 
splitReader.fetch();
-            if (records.nextSplit() != null) {
-                return records.nextRecordFromSplit();
+    private <T> List<PulsarMessage<T>> fetchedMessages(
+            PulsarPartitionSplitReaderBase<T> splitReader,
+            int expectedCount,
+            boolean verify,
+            Boundedness boundedness) {
+        List<PulsarMessage<T>> messages = new ArrayList<>(expectedCount);
+        List<String> finishedSplits = new ArrayList<>();
+        for (int i = 0; i < 3; ) {
+            try {
+                RecordsWithSplitIds<PulsarMessage<T>> recordsBySplitIds = 
splitReader.fetch();
+                if (recordsBySplitIds.nextSplit() != null) {
+                    // Collect the records in this split.
+                    PulsarMessage<T> record;
+                    while ((record = recordsBySplitIds.nextRecordFromSplit()) 
!= null) {
+                        messages.add(record);
+                    }
+                    finishedSplits.addAll(recordsBySplitIds.finishedSplits());
+                } else {
+                    i++;
+                }
+            } catch (IOException e) {
+                i++;
+            }
+            sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+        if (verify) {
+            assertThat(messages).as("We should fetch the expected 
size").hasSize(expectedCount);
+            if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED) {
+                assertThat(finishedSplits).as("Split should not be marked as 
finished").hasSize(0);
             } else {
-                return null;
+                assertThat(finishedSplits).as("Split should be marked as 
finished").hasSize(1);
             }
-        } catch (IOException e) {
-            return null;
         }
+
+        return messages;
     }
 
     @TestTemplate
-    @DisplayName("Retrieve message after timeout by using given split reader")
     void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase<String> 
splitReader)
             throws InterruptedException, TimeoutException {
         String topicName = randomAlphabetic(10);
 
         // Add a split
-        splitReader.handleSplitsChanges(createSplit(topicName, 0));
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0);
 
         // Poll once with a null message
         PulsarMessage<String> message1 = fetchedMessage(splitReader);
-        assertNull(message1);
+        assertThat(message1).isNull();
 
         // Send a message to pulsar
         String topic = topicNameWithPartition(topicName, 0);
@@ -122,14 +219,139 @@ public abstract class PulsarPartitionSplitReaderTestBase 
extends PulsarTestSuite
                     PulsarMessage<String> message2 = 
fetchedMessage(splitReader);
                     return message2 != null;
                 },
-                ofSeconds(10),
+                ofSeconds(Integer.MAX_VALUE),
                 "Couldn't poll message from Pulsar.");
     }
 
+    @TestTemplate
+    void consumeMessageCreatedAfterHandleSplitChangesAndFetch(
+            PulsarPartitionSplitReaderBase<String> splitReader) {
+        String topicName = randomAlphabetic(10);
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+        operator().sendMessage(topicNameWithPartition(topicName, 0), STRING, 
randomAlphabetic(10));
+        fetchedMessages(splitReader, 1, true);
+    }
+
+    @TestTemplate
+    void consumeMessageCreatedBeforeHandleSplitsChanges(
+            PulsarPartitionSplitReaderBase<String> splitReader) {
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+        fetchedMessages(splitReader, 0, true);
+    }
+
+    @TestTemplate
+    void 
consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition(
+            PulsarPartitionSplitReaderBase<String> splitReader) {
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0, 
MessageId.earliest);
+        fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
+    }
+
+    @TestTemplate
+    void 
consumeMessageCreatedBeforeHandleSplitsChangesAndResetToLatestPosition(
+            PulsarPartitionSplitReaderBase<String> splitReader) {
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0, 
MessageId.latest);
+        fetchedMessages(splitReader, 0, true);
+    }
+
+    @TestTemplate
+    void 
consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageIdCursor(
+            PulsarPartitionSplitReaderBase<String> splitReader) {
+
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        MessageIdImpl lastMessageId =
+                (MessageIdImpl)
+                        sneakyAdmin(
+                                () ->
+                                        operator()
+                                                .admin()
+                                                .topics()
+                                                .getLastMessageId(
+                                                        
topicNameWithPartition(topicName, 0)));
+        // when doing seek directly on consumer, by default it includes the 
specified messageId
+        seekStartPositionAndHandleSplit(
+                splitReader,
+                topicName,
+                0,
+                new MessageIdImpl(
+                        lastMessageId.getLedgerId(),
+                        lastMessageId.getEntryId() - 1,
+                        lastMessageId.getPartitionIndex()));
+        fetchedMessages(splitReader, 2, true);
+    }
+
+    @TestTemplate
+    void emptyTopic(PulsarPartitionSplitReaderBase<String> splitReader) {
+        String topicName = randomAlphabetic(10);
+        operator().createTopic(topicName, DEFAULT_PARTITIONS);
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+        fetchedMessages(splitReader, 0, true);
+    }
+
+    @TestTemplate
+    void emptyTopicWithoutSeek(PulsarPartitionSplitReaderBase<String> 
splitReader) {
+        String topicName = randomAlphabetic(10);
+        operator().createTopic(topicName, DEFAULT_PARTITIONS);
+        handleSplit(splitReader, topicName, 0);
+        fetchedMessages(splitReader, 0, true);
+    }
+
+    @TestTemplate
+    void wakeupSplitReaderShouldNotCauseException(
+            PulsarPartitionSplitReaderBase<String> splitReader) {
+        handleSplit(splitReader, "non-exist", 0);
+        AtomicReference<Throwable> error = new AtomicReference<>();
+        Thread t =
+                new Thread(
+                        () -> {
+                            try {
+                                splitReader.fetch();
+                            } catch (Throwable e) {
+                                error.set(e);
+                            }
+                        },
+                        "testWakeUp-thread");
+        t.start();
+        long deadline = System.currentTimeMillis() + 5000L;
+        while (t.isAlive() && System.currentTimeMillis() < deadline) {
+            splitReader.wakeUp();
+            sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+        }
+        assertThat(error.get()).isNull();
+    }
+
+    @TestTemplate
+    void assignNoSplits(PulsarPartitionSplitReaderBase<String> splitReader) {
+        assertThat(fetchedMessage(splitReader)).isNull();
+    }
+
     /** Create a split reader with max message 1, fetch timeout 1s. */
-    protected abstract PulsarPartitionSplitReaderBase<String> splitReader();
+    private PulsarPartitionSplitReaderBase<String> 
splitReader(SubscriptionType subscriptionType) {
+        if (subscriptionType == SubscriptionType.Failover) {
+            return new PulsarOrderedPartitionSplitReader<>(
+                    operator().client(),
+                    operator().admin(),
+                    readerConfig(),
+                    sourceConfig(),
+                    flinkSchema(new SimpleStringSchema()));
+        } else {
+            return new PulsarUnorderedPartitionSplitReader<>(
+                    operator().client(),
+                    operator().admin(),
+                    readerConfig(),
+                    sourceConfig(),
+                    flinkSchema(new SimpleStringSchema()),
+                    null);
+        }
+    }
 
-    /** JUnit5 extension for all the TestTemplate methods in this class. */
+    /** Context Provider for PulsarSplitReaderTestBase. */
     public class PulsarSplitReaderInvocationContextProvider
             implements TestTemplateInvocationContextProvider {
 
@@ -141,7 +363,11 @@ public abstract class PulsarPartitionSplitReaderTestBase 
extends PulsarTestSuite
         @Override
         public Stream<TestTemplateInvocationContext> 
provideTestTemplateInvocationContexts(
                 ExtensionContext context) {
-            return Stream.of(new 
PulsarSplitReaderInvocationContext(splitReader()));
+            SubscriptionType subscriptionType =
+                    (SubscriptionType)
+                            context.getStore(PULSAR_TEST_RESOURCE_NAMESPACE)
+                                    
.get(PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY);
+            return Stream.of(new 
PulsarSplitReaderInvocationContext(splitReader(subscriptionType)));
         }
     }
 
@@ -169,10 +395,8 @@ public abstract class PulsarPartitionSplitReaderTestBase 
extends PulsarTestSuite
                                 ParameterContext parameterContext,
                                 ExtensionContext extensionContext)
                                 throws ParameterResolutionException {
-                            return parameterContext
-                                    .getParameter()
-                                    .getType()
-                                    
.equals(PulsarPartitionSplitReaderBase.class);
+                            return isAssignableFromParameterContext(
+                                    PulsarPartitionSplitReaderBase.class, 
parameterContext);
                         }
 
                         @Override
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java
index 917cacd..2cb3cb9 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java
@@ -18,21 +18,11 @@
 
 package org.apache.flink.connector.pulsar.source.reader.split;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.pulsar.testutils.extension.SubType;
 
-import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
+import org.apache.pulsar.client.api.SubscriptionType;
 
 /** Unit tests for {@link PulsarUnorderedPartitionSplitReaderTest}. */
 class PulsarUnorderedPartitionSplitReaderTest extends 
PulsarPartitionSplitReaderTestBase {
-
-    @Override
-    protected PulsarPartitionSplitReaderBase<String> splitReader() {
-        return new PulsarUnorderedPartitionSplitReader<>(
-                operator().client(),
-                operator().admin(),
-                readerConfig(),
-                sourceConfig(),
-                flinkSchema(new SimpleStringSchema()),
-                null);
-    }
+    @SubType SubscriptionType subscriptionType = SubscriptionType.Shared;
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
new file mode 100644
index 0000000..87f3976
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.junit.jupiter.api.extension.ParameterContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Put static methods that can be used by multiple test classes. */
+public class PulsarTestCommonUtils {
+
+    // ------- CreateSplits
+    /** creates a fullRange() partitionSplit. */
+    public static PulsarPartitionSplit createPartitionSplit(String topic, int 
partitionId) {
+        return createPartitionSplit(topic, partitionId, 
Boundedness.CONTINUOUS_UNBOUNDED);
+    }
+
+    public static PulsarPartitionSplit createPartitionSplit(
+            String topic, int partitionId, Boundedness boundedness) {
+        return createPartitionSplit(topic, partitionId, boundedness, 
MessageId.earliest);
+    }
+
+    public static PulsarPartitionSplit createPartitionSplit(
+            String topic, int partitionId, Boundedness boundedness, MessageId 
latestConsumedId) {
+        TopicPartition topicPartition =
+                new TopicPartition(topic, partitionId, 
TopicRange.createFullRange());
+
+        StopCursor stopCursor =
+                boundedness == Boundedness.BOUNDED ? StopCursor.latest() : 
StopCursor.never();
+        return new PulsarPartitionSplit(topicPartition, stopCursor, 
latestConsumedId, null);
+    }
+
+    public static List<PulsarPartitionSplit> createPartitionSplits(
+            String topicName, int numSplits, Boundedness boundedness) {
+        List<PulsarPartitionSplit> splits = new ArrayList<>();
+        for (int i = 0; i < numSplits; i++) {
+            splits.add(createPartitionSplit(topicName, i, boundedness));
+        }
+        return splits;
+    }
+
+    // -------- InvocationContext Utils
+
+    public static boolean isAssignableFromParameterContext(
+            Class<?> requiredType, ParameterContext context) {
+        return requiredType.isAssignableFrom(context.getParameter().getType());
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/SubType.java
similarity index 52%
copy from 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
copy to 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/SubType.java
index a94504a..b1a8362 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/SubType.java
@@ -16,22 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.source.reader.split;
+package org.apache.flink.connector.pulsar.testutils.extension;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.annotation.Experimental;
 
-import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
-/** Unit tests for {@link PulsarOrderedPartitionSplitReaderTest}. */
-class PulsarOrderedPartitionSplitReaderTest extends 
PulsarPartitionSplitReaderTestBase {
-
-    @Override
-    protected PulsarPartitionSplitReaderBase<String> splitReader() {
-        return new PulsarOrderedPartitionSplitReader<>(
-                operator().client(),
-                operator().admin(),
-                readerConfig(),
-                sourceConfig(),
-                flinkSchema(new SimpleStringSchema()));
-    }
-}
+/** Marks the field in test class defining {@link 
org.apache.pulsar.client.api.SubscriptionType}. */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface SubType {}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/TestOrderlinessExtension.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/TestOrderlinessExtension.java
new file mode 100644
index 0000000..07c9287
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/TestOrderlinessExtension.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.extension;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.platform.commons.support.AnnotationSupport;
+
+import java.lang.annotation.Annotation;
+import java.util.Collection;
+import java.util.List;
+
+/** An extension for subclasses to specify {@link 
org.apache.pulsar.client.api.SubscriptionType}. */
+public class TestOrderlinessExtension implements BeforeAllCallback {
+
+    public static final ExtensionContext.Namespace 
PULSAR_TEST_RESOURCE_NAMESPACE =
+            ExtensionContext.Namespace.create("pulsarTestResourceNamespace");
+    public static final String 
PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY =
+            "pulsarSourceReaderSubscriptionTypeStoreKey";
+
+    private SubscriptionType subscriptionType;
+
+    @Override
+    public void beforeAll(ExtensionContext context) throws Exception {
+        final List<SubscriptionType> subscriptionTypes =
+                AnnotationSupport.findAnnotatedFieldValues(
+                        context.getRequiredTestInstance(), SubType.class, 
SubscriptionType.class);
+        checkExactlyOneAnnotatedField(subscriptionTypes, SubType.class);
+        subscriptionType = subscriptionTypes.get(0);
+        context.getStore(PULSAR_TEST_RESOURCE_NAMESPACE)
+                .put(PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY, 
subscriptionType);
+    }
+
+    private void checkExactlyOneAnnotatedField(
+            Collection<?> fields, Class<? extends Annotation> annotation) {
+        if (fields.size() > 1) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Multiple fields are annotated with '@%s'",
+                            annotation.getSimpleName()));
+        }
+        if (fields.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "No fields are annotated with '@%s'", 
annotation.getSimpleName()));
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index 2d26925..188b77c 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -91,13 +91,18 @@ public class PulsarRuntimeOperator implements Serializable, 
Closeable {
     }
 
     public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> 
supplier) {
+        setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION);
+    }
+
+    public <T> void setupTopic(
+            String topic, Schema<T> schema, Supplier<T> supplier, int 
numRecordsPerSplit) {
         createTopic(topic, DEFAULT_PARTITIONS);
 
-        // Make sure every topic partition has message.
+        // Make sure every topic partition has messages.
         for (int i = 0; i < DEFAULT_PARTITIONS; i++) {
             String partitionName = 
TopicNameUtils.topicNameWithPartition(topic, i);
             List<T> messages =
-                    
Stream.generate(supplier).limit(NUM_RECORDS_PER_PARTITION).collect(toList());
+                    
Stream.generate(supplier).limit(numRecordsPerSplit).collect(toList());
 
             sendMessages(partitionName, schema, messages);
         }
@@ -204,7 +209,6 @@ public class PulsarRuntimeOperator implements Serializable, 
Closeable {
         Configuration configuration = new Configuration();
         configuration.set(PULSAR_SERVICE_URL, serviceUrl());
         configuration.set(PULSAR_ADMIN_URL, adminUrl());
-
         return configuration;
     }
 

Reply via email to