This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1572908  [FLINK-26177][Connector/pulsar] Use mocked pulsar runtime 
instead of embedded runtime and enable tests.
1572908 is described below

commit 1572908edbb17455a83f9e45974cf312a478283b
Author: Yufan Sheng <[email protected]>
AuthorDate: Fri Feb 18 16:20:37 2022 +0800

    [FLINK-26177][Connector/pulsar] Use mocked pulsar runtime instead of 
embedded runtime and enable tests.
---
 .../3ac3a1dc-681f-4213-9990-b7b3298a20bc           |   0
 .../f4d91193-72ba-4ce4-ad83-98f780dce581           |   6 +
 .../archunit-violations/stored.rules               |   4 +
 flink-connectors/flink-connector-pulsar/pom.xml    |  17 ++-
 .../connector/pulsar/sink/PulsarSinkITCase.java    |  13 ++-
 .../pulsar/sink/writer/PulsarWriterTest.java       |  12 +-
 .../pulsar/source/PulsarSourceITCase.java          |  43 ++++---
 .../source/PulsarOrderedSourceReaderTest.java      |  47 +++-----
 .../reader/source/PulsarSourceReaderTestBase.java  |   6 -
 .../pulsar/testutils/PulsarTestSuiteBase.java      |   2 +-
 .../pulsar/testutils/function/ControlSource.java   |  14 ++-
 .../pulsar/testutils/runtime/PulsarRuntime.java    |  14 ++-
 .../testutils/runtime/PulsarRuntimeUtils.java      | 124 +++++++++++++++++++++
 .../runtime/embedded/PulsarEmbeddedRuntime.java    |  98 +---------------
 .../runtime/mock/BlankBrokerInterceptor.java       |  61 ++++++++++
 .../runtime/mock/MockBookKeeperClientFactory.java  |  74 ++++++++++++
 .../testutils/runtime/mock/MockPulsarService.java  |  87 +++++++++++++++
 .../runtime/mock/MockZooKeeperClientFactory.java   |  73 ++++++++++++
 .../runtime/mock/NonClosableMockBookKeeper.java    |  55 +++++++++
 .../testutils/runtime/mock/PulsarMockRuntime.java  | 110 ++++++++++++++++++
 .../mock/SameThreadOrderedSafeExecutor.java        |  56 ++++++++++
 .../test/resources/containers/txnStandalone.conf   |   2 +-
 22 files changed, 745 insertions(+), 173 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/archunit-violations/3ac3a1dc-681f-4213-9990-b7b3298a20bc
 
b/flink-connectors/flink-connector-pulsar/archunit-violations/3ac3a1dc-681f-4213-9990-b7b3298a20bc
new file mode 100644
index 0000000..e69de29
diff --git 
a/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
 
b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
new file mode 100644
index 0000000..452f99f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -0,0 +1,6 @@
+org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: 
only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
\ No newline at end of file
diff --git 
a/flink-connectors/flink-connector-pulsar/archunit-violations/stored.rules 
b/flink-connectors/flink-connector-pulsar/archunit-violations/stored.rules
new file mode 100644
index 0000000..efb12f3
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Thu Mar 03 12:42:13 CST 2022
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ 
ITCase=3ac3a1dc-681f-4213-9990-b7b3298a20bc
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ 
extension=f4d91193-72ba-4ce4-ad83-98f780dce581
diff --git a/flink-connectors/flink-connector-pulsar/pom.xml 
b/flink-connectors/flink-connector-pulsar/pom.xml
index 7fc3727..e8db230 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -120,6 +120,22 @@ under the License.
                <!-- we don't override the version here. -->
                <dependency>
                        <groupId>org.apache.pulsar</groupId>
+                       <artifactId>testmocks</artifactId>
+                       <version>${pulsar.version}</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.testng</groupId>
+                                       <artifactId>testng</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.powermock</groupId>
+                                       
<artifactId>powermock-module-testng</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.pulsar</groupId>
                        <artifactId>pulsar-broker</artifactId>
                        <version>${pulsar.version}</version>
                        <scope>test</scope>
@@ -236,7 +252,6 @@ under the License.
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-surefire-plugin</artifactId>
                                <configuration>
-                                       <skip>true</skip>
                                        <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
                                        <forkCount>1</forkCount>
                                        <argLine>-Xms256m -Xmx2048m 
-Dmvn.forkNumber=${surefire.forkNumber}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 94e23d7..2c2c05d 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -68,7 +68,13 @@ class PulsarSinkITCase extends PulsarTestSuiteBase {
 
         ControlSource source =
                 new ControlSource(
-                        sharedObjects, operator(), topic, guarantee, counts, 
Duration.ofMinutes(5));
+                        sharedObjects,
+                        operator(),
+                        topic,
+                        guarantee,
+                        counts,
+                        Duration.ofMillis(50),
+                        Duration.ofMinutes(5));
         PulsarSink<String> sink =
                 PulsarSink.builder()
                         .setServiceUrl(operator().serviceUrl())
@@ -79,8 +85,11 @@ class PulsarSinkITCase extends PulsarTestSuiteBase {
                         .build();
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
         env.setParallelism(PARALLELISM);
-        env.enableCheckpointing(100L);
+        if (guarantee != DeliveryGuarantee.NONE) {
+            env.enableCheckpointing(500L);
+        }
         env.addSource(source).sinkTo(sink);
         env.execute();
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
index 942b759..eb766ee 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.UserCodeClassLoader;
 
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
@@ -65,16 +64,9 @@ class PulsarWriterTest extends PulsarTestSuiteBase {
 
     private static final SinkWriter.Context CONTEXT = new 
MockSinkWriterContext();
 
-    @Test
-    void writeMessageWithGuarantee() throws Exception {
-        writeMessageWithoutGuarantee(EXACTLY_ONCE);
-    }
-
     @ParameterizedTest
-    @EnumSource(
-            value = DeliveryGuarantee.class,
-            names = {"AT_LEAST_ONCE", "NONE"})
-    void writeMessageWithoutGuarantee(DeliveryGuarantee guarantee) throws 
Exception {
+    @EnumSource(DeliveryGuarantee.class)
+    void writeMessages(DeliveryGuarantee guarantee) throws Exception {
         String topic = randomAlphabetic(10);
         operator().createTopic(topic, 8);
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 9a72c8a..f3435b1 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -33,38 +33,15 @@ import 
org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
-import org.junit.jupiter.api.Disabled;
-
 /** Unite test class for {@link PulsarSource}. */
 @SuppressWarnings("unused")
 class PulsarSourceITCase extends SourceTestSuiteBase<String> {
-
-    @Disabled // TODO: remove override after FLINK-26177 is fixed
-    @Override
-    public void testScaleUp(
-            TestEnvironment testEnv,
-            DataStreamSourceExternalContext<String> externalContext,
-            CheckpointingMode semantic)
-            throws Exception {
-        super.testScaleUp(testEnv, externalContext, semantic);
-    }
-
-    @Disabled // TODO: remove override after FLINK-26177 is fixed
-    @Override
-    public void testScaleDown(
-            TestEnvironment testEnv,
-            DataStreamSourceExternalContext<String> externalContext,
-            CheckpointingMode semantic)
-            throws Exception {
-        super.testScaleDown(testEnv, externalContext, semantic);
-    }
-
     // Defines test environment on Flink MiniCluster
     @TestEnv MiniClusterTestEnvironment flink = new 
MiniClusterTestEnvironment();
 
     // Defines pulsar running environment
     @TestExternalSystem
-    PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.embedded());
+    PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.mock());
 
     @TestSemantics
     CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
@@ -78,4 +55,22 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> 
{
     @TestContext
     PulsarTestContextFactory<String, MultipleTopicConsumingContext> 
multipleTopic =
             new PulsarTestContextFactory<>(pulsar, 
MultipleTopicConsumingContext::new);
+
+    @Override
+    public void testScaleUp(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<String> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        super.testScaleUp(testEnv, externalContext, semantic);
+    }
+
+    @Override
+    public void testScaleDown(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<String> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        super.testScaleDown(testEnv, externalContext, semantic);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
index 477dfdd..9806108 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
@@ -28,14 +28,15 @@ import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
-import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.junit.jupiter.api.TestTemplate;
 
 import java.time.Duration;
 import java.util.Collections;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
@@ -139,22 +140,6 @@ class PulsarOrderedSourceReaderTest extends 
PulsarSourceReaderTestBase {
         reader.notifyNoMoreSplits();
     }
 
-    private void pollUntilReadExpectedNumberOfRecordsAndValidate(
-            PulsarSourceReaderBase<Integer> reader,
-            TestingReaderOutput<Integer> output,
-            int expectedRecords,
-            String topicNameWithPartition)
-            throws Exception {
-        pollUntil(
-                reader,
-                output,
-                () -> output.getEmittedRecords().size() == expectedRecords,
-                "The output didn't poll enough records before timeout.");
-        reader.close();
-        verifyAllMessageAcknowledged(expectedRecords, topicNameWithPartition);
-        assertThat(output.getEmittedRecords()).hasSize(expectedRecords);
-    }
-
     private void pollUntil(
             PulsarSourceReaderBase<Integer> reader,
             ReaderOutput<Integer> output,
@@ -177,15 +162,19 @@ class PulsarOrderedSourceReaderTest extends 
PulsarSourceReaderTestBase {
     }
 
     private void verifyAllMessageAcknowledged(int expectedMessages, String 
partitionName)
-            throws PulsarAdminException {
-        TopicStats topicStats = 
operator().admin().topics().getStats(partitionName, true, true);
-        // verify if the messages has been consumed
-        Map<String, ? extends SubscriptionStats> subscriptionStats = 
topicStats.getSubscriptions();
-        assertThat(subscriptionStats).hasSizeGreaterThan(0);
-        subscriptionStats.forEach(
-                (subscription, stats) -> {
-                    assertThat(stats.getUnackedMessages()).isZero();
-                    
assertThat(stats.getMsgOutCounter()).isEqualTo(expectedMessages);
-                });
+            throws PulsarAdminException, PulsarClientException {
+
+        Consumer<byte[]> consumer =
+                operator()
+                        .client()
+                        .newConsumer()
+                        .subscriptionType(SubscriptionType.Exclusive)
+                        
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+                        .subscriptionName("verify-message")
+                        .topic(partitionName)
+                        .subscribe();
+
+        assertThat(((MessageIdImpl) consumer.getLastMessageId()).getEntryId())
+                .isEqualTo(expectedMessages - 1);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
index a42741d..3819ff6 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
@@ -40,7 +40,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -92,11 +91,6 @@ abstract class PulsarSourceReaderTestBase extends 
PulsarTestSuiteBase {
         operator().setupTopic(topicName, Schema.INT32, () -> 
random.nextInt(20));
     }
 
-    @AfterEach
-    void afterEach(String topicName) {
-        operator().deleteTopic(topicName);
-    }
-
     @TestTemplate
     void assignZeroSplitsCreatesZeroSubscription(
             PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, 
String topicName)
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
index c87140b..b55fdc5 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
@@ -56,7 +56,7 @@ public abstract class PulsarTestSuiteBase {
      * pulsar broker. Override this method when needs.
      */
     protected PulsarRuntime runtime() {
-        return PulsarRuntime.embedded();
+        return PulsarRuntime.mock();
     }
 
     /** Operate pulsar by acquiring a runtime operator. */
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
index 6e35027..3684167 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.testutils.junit.SharedObjectsExtension;
 import org.apache.flink.testutils.junit.SharedReference;
 
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles;
+
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.slf4j.Logger;
@@ -66,8 +68,10 @@ public class ControlSource extends AbstractRichFunction
             String topic,
             DeliveryGuarantee guarantee,
             int messageCounts,
+            Duration interval,
             Duration timeout) {
-        MessageGenerator generator = new MessageGenerator(topic, guarantee, 
messageCounts);
+        MessageGenerator generator =
+                new MessageGenerator(topic, guarantee, messageCounts, 
interval);
         StopSignal signal = new StopSignal(operator, topic, messageCounts, 
timeout);
 
         this.sharedGenerator = sharedObjects.add(generator);
@@ -134,12 +138,15 @@ public class ControlSource extends AbstractRichFunction
         private final DeliveryGuarantee guarantee;
         private final int messageCounts;
         private final List<String> expectedRecords;
+        private final Duration interval;
 
-        public MessageGenerator(String topic, DeliveryGuarantee guarantee, int 
messageCounts) {
+        public MessageGenerator(
+                String topic, DeliveryGuarantee guarantee, int messageCounts, 
Duration interval) {
             this.topic = topic;
             this.guarantee = guarantee;
             this.messageCounts = messageCounts;
             this.expectedRecords = new ArrayList<>(messageCounts);
+            this.interval = interval;
         }
 
         @Override
@@ -158,6 +165,9 @@ public class ControlSource extends AbstractRichFunction
                             + "-"
                             + randomAlphanumeric(10);
             expectedRecords.add(content);
+
+            // Make sure the message was generated in the fixed interval.
+            Uninterruptibles.sleepUninterruptibly(interval);
             return content;
         }
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
index 9c1cd01..92dd94c 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.pulsar.testutils.runtime;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime;
 import 
org.apache.flink.connector.pulsar.testutils.runtime.embedded.PulsarEmbeddedRuntime;
+import 
org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockRuntime;
 
 import org.testcontainers.containers.GenericContainer;
 
@@ -39,18 +40,23 @@ public interface PulsarRuntime {
     void tearDown();
 
     /**
-     * Return a operator for operating this pulsar runtime. This operator 
predefined a set of
+     * Return an operator for operating this pulsar runtime. This operator 
predefined a set of
      * extremely useful methods for Pulsar. You can easily add new methods in 
this operator.
      */
     PulsarRuntimeOperator operator();
 
+    /** Create a Pulsar instance which would mock all the backends. */
+    static PulsarRuntime mock() {
+        return new PulsarMockRuntime();
+    }
+
     /**
-     * Create a standalone Pulsar instance in test thread. We would start a 
embedded zookeeper and
+     * Create a standalone Pulsar instance in test thread. We would start an 
embedded zookeeper and
      * bookkeeper. The stream storage for bookkeeper is disabled. The function 
worker is disabled on
      * Pulsar broker.
      *
-     * <p>This runtime would be faster than {@link #container()} and behaves 
the same like the
-     * {@link #container()}.
+     * <p>This runtime would be faster than {@link #container()} and behaves 
the same as the {@link
+     * #container()}.
      */
     static PulsarRuntime embedded() {
         return new PulsarEmbeddedRuntime();
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeUtils.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeUtils.java
new file mode 100644
index 0000000..c2c2e6b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static 
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
+
+/** This class is used to create the basic topics for a standalone Pulsar 
instance. */
+public final class PulsarRuntimeUtils {
+
+    private PulsarRuntimeUtils() {
+        // No public constructor
+    }
+
+    /** Create the system topics. */
+    public static void initializePulsarEnvironment(
+            ServiceConfiguration config, String serviceUrl, String adminUrl)
+            throws PulsarAdminException, PulsarClientException {
+        try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+            ClusterData clusterData =
+                    
ClusterData.builder().serviceUrl(adminUrl).brokerServiceUrl(serviceUrl).build();
+            String cluster = config.getClusterName();
+            createSampleNameSpace(admin, clusterData, cluster);
+
+            // Create default namespace
+            createNameSpace(
+                    admin,
+                    cluster,
+                    TopicName.PUBLIC_TENANT,
+                    TopicName.PUBLIC_TENANT + "/" + 
TopicName.DEFAULT_NAMESPACE);
+
+            // Create Pulsar system namespace
+            createNameSpace(
+                    admin, cluster, SYSTEM_NAMESPACE.getTenant(), 
SYSTEM_NAMESPACE.toString());
+            // Enable transaction
+            if (config.isTransactionCoordinatorEnabled()
+                    && !admin.namespaces()
+                            .getTopics(SYSTEM_NAMESPACE.toString())
+                            
.contains(TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())) {
+                
admin.topics().createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+            }
+        }
+    }
+
+    private static void createSampleNameSpace(
+            PulsarAdmin admin, ClusterData clusterData, String cluster)
+            throws PulsarAdminException {
+        // Create a sample namespace
+        String tenant = "sample";
+        String globalCluster = "global";
+        String namespace = tenant + "/ns1";
+
+        List<String> clusters = admin.clusters().getClusters();
+        if (!clusters.contains(cluster)) {
+            admin.clusters().createCluster(cluster, clusterData);
+        } else {
+            admin.clusters().updateCluster(cluster, clusterData);
+        }
+        // Create marker for "global" cluster
+        if (!clusters.contains(globalCluster)) {
+            admin.clusters().createCluster(globalCluster, 
ClusterData.builder().build());
+        }
+
+        if (!admin.tenants().getTenants().contains(tenant)) {
+            admin.tenants()
+                    .createTenant(
+                            tenant,
+                            new TenantInfoImpl(
+                                    Collections.emptySet(), 
Collections.singleton(cluster)));
+        }
+
+        if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) {
+            admin.namespaces().createNamespace(namespace);
+        }
+    }
+
+    private static void createNameSpace(
+            PulsarAdmin admin, String cluster, String publicTenant, String 
defaultNamespace)
+            throws PulsarAdminException {
+        if (!admin.tenants().getTenants().contains(publicTenant)) {
+            admin.tenants()
+                    .createTenant(
+                            publicTenant,
+                            TenantInfo.builder()
+                                    .adminRoles(Collections.emptySet())
+                                    
.allowedClusters(Collections.singleton(cluster))
+                                    .build());
+        }
+        if 
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
+            admin.namespaces().createNamespace(defaultNamespace);
+            admin.namespaces()
+                    .setNamespaceReplicationClusters(
+                            defaultNamespace, Collections.singleton(cluster));
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
index cf080b8..07981cc 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
@@ -25,13 +25,7 @@ import org.apache.flink.util.FileUtils;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,15 +37,12 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.List;
 import java.util.Optional;
 
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeUtils.initializePulsarEnvironment;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.pulsar.broker.ServiceConfigurationUtils.brokerUrl;
 import static org.apache.pulsar.broker.ServiceConfigurationUtils.webServiceUrl;
-import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
-import static 
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
 
 /** Providing a embedded pulsar server. We use this runtime for transaction 
related tests. */
 public class PulsarEmbeddedRuntime implements PulsarRuntime {
@@ -84,7 +75,7 @@ public class PulsarEmbeddedRuntime implements PulsarRuntime {
             startPulsarService();
 
             // Create the operator.
-            this.operator = new PulsarRuntimeOperator(getBrokerUrl(), 
getWebServiceUrl());
+            this.operator = new PulsarRuntimeOperator(serviceUrl(), 
adminUrl());
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }
@@ -175,99 +166,20 @@ public class PulsarEmbeddedRuntime implements 
PulsarRuntime {
         pulsarService.start();
 
         // Create sample data environment.
-        String webServiceUrl = getWebServiceUrl();
-        String brokerUrl = getBrokerUrl();
-        try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(webServiceUrl).build()) {
-            ClusterData clusterData =
-                    ClusterData.builder()
-                            .serviceUrl(webServiceUrl)
-                            .brokerServiceUrl(brokerUrl)
-                            .build();
-            String cluster = config.getClusterName();
-            createSampleNameSpace(admin, clusterData, cluster);
-
-            // Create default namespace
-            createNameSpace(
-                    admin,
-                    cluster,
-                    TopicName.PUBLIC_TENANT,
-                    TopicName.PUBLIC_TENANT + "/" + 
TopicName.DEFAULT_NAMESPACE);
-
-            // Create Pulsar system namespace
-            createNameSpace(
-                    admin, cluster, SYSTEM_NAMESPACE.getTenant(), 
SYSTEM_NAMESPACE.toString());
-            // Enable transaction
-            if (config.isTransactionCoordinatorEnabled()
-                    && !admin.namespaces()
-                            .getTopics(SYSTEM_NAMESPACE.toString())
-                            
.contains(TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())) {
-                
admin.topics().createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
-            }
-        }
+        initializePulsarEnvironment(config, serviceUrl(), adminUrl());
     }
 
     private int getZkPort() {
         return checkNotNull(bookkeeper).getZookeeperPort();
     }
 
-    private String getBrokerUrl() {
+    private String serviceUrl() {
         Integer port = 
pulsarService.getBrokerListenPort().orElseThrow(IllegalStateException::new);
         return brokerUrl("127.0.0.1", port);
     }
 
-    private String getWebServiceUrl() {
+    private String adminUrl() {
         Integer port = 
pulsarService.getListenPortHTTP().orElseThrow(IllegalArgumentException::new);
         return webServiceUrl("127.0.0.1", port);
     }
-
-    private void createSampleNameSpace(PulsarAdmin admin, ClusterData 
clusterData, String cluster)
-            throws PulsarAdminException {
-        // Create a sample namespace
-        String tenant = "sample";
-        String globalCluster = "global";
-        String namespace = tenant + "/ns1";
-
-        List<String> clusters = admin.clusters().getClusters();
-        if (!clusters.contains(cluster)) {
-            admin.clusters().createCluster(cluster, clusterData);
-        } else {
-            admin.clusters().updateCluster(cluster, clusterData);
-        }
-        // Create marker for "global" cluster
-        if (!clusters.contains(globalCluster)) {
-            admin.clusters().createCluster(globalCluster, 
ClusterData.builder().build());
-        }
-
-        if (!admin.tenants().getTenants().contains(tenant)) {
-            admin.tenants()
-                    .createTenant(
-                            tenant,
-                            new TenantInfoImpl(
-                                    Collections.emptySet(), 
Collections.singleton(cluster)));
-        }
-
-        if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) {
-            admin.namespaces().createNamespace(namespace);
-        }
-    }
-
-    private void createNameSpace(
-            PulsarAdmin admin, String cluster, String publicTenant, String 
defaultNamespace)
-            throws PulsarAdminException {
-        if (!admin.tenants().getTenants().contains(publicTenant)) {
-            admin.tenants()
-                    .createTenant(
-                            publicTenant,
-                            TenantInfo.builder()
-                                    .adminRoles(Collections.emptySet())
-                                    
.allowedClusters(Collections.singleton(cluster))
-                                    .build());
-        }
-        if 
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
-            admin.namespaces().createNamespace(defaultNamespace);
-            admin.namespaces()
-                    .setNamespaceReplicationClusters(
-                            defaultNamespace, Collections.singleton(cluster));
-        }
-    }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/BlankBrokerInterceptor.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/BlankBrokerInterceptor.java
new file mode 100644
index 0000000..8355a23
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/BlankBrokerInterceptor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+
+/** No operation for this BrokerInterceptor implementation. */
+public class BlankBrokerInterceptor implements BrokerInterceptor {
+
+    @Override
+    public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
+        // no-op
+    }
+
+    @Override
+    public void onConnectionClosed(ServerCnx cnx) {
+        // no-op
+    }
+
+    @Override
+    public void onWebserviceRequest(ServletRequest request) {
+        // no-op
+    }
+
+    @Override
+    public void onWebserviceResponse(ServletRequest request, ServletResponse 
response) {
+        // no-op
+    }
+
+    @Override
+    public void initialize(PulsarService pulsarService) {
+        // no-op
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockBookKeeperClientFactory.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockBookKeeperClientFactory.java
new file mode 100644
index 0000000..41fad54
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockBookKeeperClientFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import io.netty.channel.EventLoopGroup;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+/** A BookKeeperClientFactory implementation which returns a mocked 
bookkeeper. */
+public class MockBookKeeperClientFactory implements BookKeeperClientFactory {
+
+    private final OrderedExecutor executor =
+            
OrderedExecutor.newBuilder().numThreads(1).name("mock-pulsar-bookkeeper").build();
+
+    private final BookKeeper bookKeeper = 
NonClosableMockBookKeeper.create(executor);
+
+    @Override
+    public BookKeeper create(
+            ServiceConfiguration conf,
+            ZooKeeper zkClient,
+            EventLoopGroup eventLoopGroup,
+            Optional<Class<? extends EnsemblePlacementPolicy>> 
ensemblePlacementPolicyClass,
+            Map<String, Object> ensemblePlacementPolicyProperties)
+            throws IOException {
+        return bookKeeper;
+    }
+
+    @Override
+    public BookKeeper create(
+            ServiceConfiguration conf,
+            ZooKeeper zkClient,
+            EventLoopGroup eventLoopGroup,
+            Optional<Class<? extends EnsemblePlacementPolicy>> 
ensemblePlacementPolicyClass,
+            Map<String, Object> ensemblePlacementPolicyProperties,
+            StatsLogger statsLogger)
+            throws IOException {
+        return bookKeeper;
+    }
+
+    @Override
+    public void close() {
+        try {
+            bookKeeper.close();
+            executor.shutdown();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockPulsarService.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockPulsarService.java
new file mode 100644
index 0000000..6b6c412
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockPulsarService.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.zookeeper.MockZooKeeperSession;
+
+import java.util.function.Supplier;
+
+/** A Mock pulsar service which would use the mocked zookeeper and bookkeeper. 
*/
+public class MockPulsarService extends PulsarService {
+
+    private final int brokerServicePort;
+
+    private final MockZooKeeperClientFactory zooKeeperClientFactory =
+            new MockZooKeeperClientFactory();
+
+    private final MockZooKeeperSession zooKeeperSession =
+            
MockZooKeeperSession.newInstance(zooKeeperClientFactory.getZooKeeper());
+
+    private final SameThreadOrderedSafeExecutor orderedExecutor =
+            new SameThreadOrderedSafeExecutor();
+
+    public MockPulsarService(ServiceConfiguration config) {
+        super(config);
+        this.brokerServicePort =
+                
config.getBrokerServicePort().orElseThrow(IllegalArgumentException::new);
+    }
+
+    public ZooKeeperClientFactory getZooKeeperClientFactory() {
+        return zooKeeperClientFactory;
+    }
+
+    public BookKeeperClientFactory newBookKeeperClientFactory() {
+        return new MockBookKeeperClientFactory();
+    }
+
+    public MetadataStoreExtended createLocalMetadataStore() {
+        return new ZKMetadataStore(zooKeeperSession);
+    }
+
+    public MetadataStoreExtended createConfigurationMetadataStore() {
+        return new ZKMetadataStore(zooKeeperSession);
+    }
+
+    public Supplier<NamespaceService> getNamespaceServiceProvider() {
+        return () -> new NamespaceService(this);
+    }
+
+    @Override
+    public OrderedExecutor getOrderedExecutor() {
+        return orderedExecutor;
+    }
+
+    @Override
+    public BrokerInterceptor getBrokerInterceptor() {
+        return new BlankBrokerInterceptor();
+    }
+
+    public int getBrokerServicePort() {
+        return brokerServicePort;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockZooKeeperClientFactory.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockZooKeeperClientFactory.java
new file mode 100644
index 0000000..3c89484
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockZooKeeperClientFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl.ENCODING_SCHEME;
+import static org.apache.zookeeper.CreateMode.PERSISTENT;
+
+/** A ZooKeeperClientFactory implementation which returns mocked zookeeper 
instead of normal zk. */
+public class MockZooKeeperClientFactory implements ZooKeeperClientFactory {
+
+    private final MockZooKeeper zooKeeper;
+
+    public MockZooKeeperClientFactory() {
+        this.zooKeeper = 
MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+        List<ACL> dummyAclList = new ArrayList<>(0);
+
+        try {
+            ZkUtils.createFullPathOptimistic(
+                    zooKeeper,
+                    "/ledgers/available/192.168.1.1:" + 5000,
+                    "".getBytes(ENCODING_SCHEME),
+                    dummyAclList,
+                    PERSISTENT);
+
+            zooKeeper.create(
+                    "/ledgers/LAYOUT",
+                    "1\nflat:1".getBytes(ENCODING_SCHEME),
+                    dummyAclList,
+                    PERSISTENT);
+        } catch (KeeperException | InterruptedException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<ZooKeeper> create(
+            String serverList, SessionType sessionType, int 
zkSessionTimeoutMillis) {
+        return CompletableFuture.completedFuture(zooKeeper);
+    }
+
+    MockZooKeeper getZooKeeper() {
+        return zooKeeper;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/NonClosableMockBookKeeper.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/NonClosableMockBookKeeper.java
new file mode 100644
index 0000000..b7001b8
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/NonClosableMockBookKeeper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+
+/**
+ * Prevent the MockBookKeeper instance from being closed when the broker is 
restarted within a test.
+ */
+public class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
+
+    private NonClosableMockBookKeeper(OrderedExecutor executor) throws 
Exception {
+        super(executor);
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+
+    @Override
+    public void shutdown() {
+        // no-op
+    }
+
+    public void reallyShutdown() {
+        super.shutdown();
+    }
+
+    public static BookKeeper create(OrderedExecutor executor) {
+        try {
+            return new NonClosableMockBookKeeper(executor);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
new file mode 100644
index 0000000..e6ff060
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
+import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+
+import java.util.Optional;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeUtils.initializePulsarEnvironment;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Providing a mocked pulsar server. */
+public class PulsarMockRuntime implements PulsarRuntime {
+
+    private static final String CLUSTER_NAME = "mock-pulsar-" + 
randomAlphanumeric(6);
+    private final ServiceConfiguration configuration;
+    private final MockPulsarService pulsarService;
+    private PulsarRuntimeOperator operator;
+
+    public PulsarMockRuntime() {
+        this(createConfig());
+    }
+
+    public PulsarMockRuntime(ServiceConfiguration configuration) {
+        this.configuration = configuration;
+        this.pulsarService = new MockPulsarService(configuration);
+    }
+
+    @Override
+    public void startUp() {
+        try {
+            pulsarService.start();
+
+            String serviceUrl = pulsarService.getBrokerServiceUrl();
+            String adminUrl = pulsarService.getWebServiceAddress();
+            initializePulsarEnvironment(configuration, serviceUrl, adminUrl);
+
+            this.operator = new PulsarRuntimeOperator(serviceUrl, adminUrl);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void tearDown() {
+        try {
+            pulsarService.close();
+            operator.close();
+            this.operator = null;
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public PulsarRuntimeOperator operator() {
+        return checkNotNull(operator, "You should start this mock pulsar 
first.");
+    }
+
+    private static ServiceConfiguration createConfig() {
+        ServiceConfiguration configuration = new ServiceConfiguration();
+
+        configuration.setAdvertisedAddress("localhost");
+        configuration.setClusterName(CLUSTER_NAME);
+
+        configuration.setManagedLedgerCacheSizeMB(8);
+        configuration.setActiveConsumerFailoverDelayTimeMillis(0);
+        configuration.setDefaultRetentionTimeInMinutes(7);
+        configuration.setDefaultNumberOfNamespaceBundles(1);
+        configuration.setZookeeperServers("localhost:2181");
+        configuration.setConfigurationStoreServers("localhost:3181");
+
+        configuration.setAuthenticationEnabled(false);
+        configuration.setAuthorizationEnabled(false);
+        configuration.setAllowAutoTopicCreation(true);
+        configuration.setBrokerDeleteInactiveTopicsEnabled(false);
+
+        configuration.setWebSocketServiceEnabled(false);
+        // Use runtime dynamic ports
+        configuration.setBrokerServicePort(Optional.of(0));
+        configuration.setWebServicePort(Optional.of(0));
+
+        // Enable transactions.
+        configuration.setTransactionCoordinatorEnabled(true);
+        configuration.setTransactionMetadataStoreProviderClassName(
+                
"org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider");
+
+        return configuration;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/SameThreadOrderedSafeExecutor.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/SameThreadOrderedSafeExecutor.java
new file mode 100644
index 0000000..9667f08
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/SameThreadOrderedSafeExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
+
+/** Override the default bookkeeper executor for executing in one thread 
executor. */
+public class SameThreadOrderedSafeExecutor extends OrderedExecutor {
+
+    public SameThreadOrderedSafeExecutor() {
+        super(
+                "same-thread-executor",
+                1,
+                new DefaultThreadFactory("test"),
+                NullStatsLogger.INSTANCE,
+                false,
+                false,
+                100000,
+                -1,
+                false);
+    }
+
+    @Override
+    public void execute(Runnable r) {
+        r.run();
+    }
+
+    @Override
+    public void executeOrdered(int orderingKey, SafeRunnable r) {
+        r.run();
+    }
+
+    @Override
+    public void executeOrdered(long orderingKey, SafeRunnable r) {
+        r.run();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
 
b/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
index bf35c59..10437a717 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
@@ -1015,7 +1015,7 @@ defaultNumPartitions=1
 ### --- Transaction config variables --- ###
 # Enable transaction coordinator in broker
 transactionCoordinatorEnabled=true
-transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider
+transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider
 
 # Transaction buffer take snapshot transaction count
 transactionBufferSnapshotMaxTransactionCount=1000

Reply via email to