This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1572908 [FLINK-26177][Connector/pulsar] Use mocked pulsar runtime
instead of embedded runtime and enable tests.
1572908 is described below
commit 1572908edbb17455a83f9e45974cf312a478283b
Author: Yufan Sheng <[email protected]>
AuthorDate: Fri Feb 18 16:20:37 2022 +0800
[FLINK-26177][Connector/pulsar] Use mocked pulsar runtime instead of
embedded runtime and enable tests.
---
.../3ac3a1dc-681f-4213-9990-b7b3298a20bc | 0
.../f4d91193-72ba-4ce4-ad83-98f780dce581 | 6 +
.../archunit-violations/stored.rules | 4 +
flink-connectors/flink-connector-pulsar/pom.xml | 17 ++-
.../connector/pulsar/sink/PulsarSinkITCase.java | 13 ++-
.../pulsar/sink/writer/PulsarWriterTest.java | 12 +-
.../pulsar/source/PulsarSourceITCase.java | 43 ++++---
.../source/PulsarOrderedSourceReaderTest.java | 47 +++-----
.../reader/source/PulsarSourceReaderTestBase.java | 6 -
.../pulsar/testutils/PulsarTestSuiteBase.java | 2 +-
.../pulsar/testutils/function/ControlSource.java | 14 ++-
.../pulsar/testutils/runtime/PulsarRuntime.java | 14 ++-
.../testutils/runtime/PulsarRuntimeUtils.java | 124 +++++++++++++++++++++
.../runtime/embedded/PulsarEmbeddedRuntime.java | 98 +---------------
.../runtime/mock/BlankBrokerInterceptor.java | 61 ++++++++++
.../runtime/mock/MockBookKeeperClientFactory.java | 74 ++++++++++++
.../testutils/runtime/mock/MockPulsarService.java | 87 +++++++++++++++
.../runtime/mock/MockZooKeeperClientFactory.java | 73 ++++++++++++
.../runtime/mock/NonClosableMockBookKeeper.java | 55 +++++++++
.../testutils/runtime/mock/PulsarMockRuntime.java | 110 ++++++++++++++++++
.../mock/SameThreadOrderedSafeExecutor.java | 56 ++++++++++
.../test/resources/containers/txnStandalone.conf | 2 +-
22 files changed, 745 insertions(+), 173 deletions(-)
diff --git
a/flink-connectors/flink-connector-pulsar/archunit-violations/3ac3a1dc-681f-4213-9990-b7b3298a20bc
b/flink-connectors/flink-connector-pulsar/archunit-violations/3ac3a1dc-681f-4213-9990-b7b3298a20bc
new file mode 100644
index 0000000..e69de29
diff --git
a/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
new file mode 100644
index 0000000..452f99f
--- /dev/null
+++
b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -0,0 +1,6 @@
+org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy:
only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that
are static, final, and of type InternalMiniClusterExtension and annotated with
@RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any
fields that are static, final, and of type MiniClusterExtension and annotated
with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientResource and public and
final and not static and annotated with @Rule
\ No newline at end of file
diff --git
a/flink-connectors/flink-connector-pulsar/archunit-violations/stored.rules
b/flink-connectors/flink-connector-pulsar/archunit-violations/stored.rules
new file mode 100644
index 0000000..efb12f3
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Thu Mar 03 12:42:13 CST 2022
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\
ITCase=3ac3a1dc-681f-4213-9990-b7b3298a20bc
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\
extension=f4d91193-72ba-4ce4-ad83-98f780dce581
diff --git a/flink-connectors/flink-connector-pulsar/pom.xml
b/flink-connectors/flink-connector-pulsar/pom.xml
index 7fc3727..e8db230 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -120,6 +120,22 @@ under the License.
<!-- we don't override the version here. -->
<dependency>
<groupId>org.apache.pulsar</groupId>
+ <artifactId>testmocks</artifactId>
+ <version>${pulsar.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.powermock</groupId>
+
<artifactId>powermock-module-testng</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${pulsar.version}</version>
<scope>test</scope>
@@ -236,7 +252,6 @@ under the License.
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <skip>true</skip>
<!-- Enforce single fork execution due
to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
<argLine>-Xms256m -Xmx2048m
-Dmvn.forkNumber=${surefire.forkNumber}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 94e23d7..2c2c05d 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -68,7 +68,13 @@ class PulsarSinkITCase extends PulsarTestSuiteBase {
ControlSource source =
new ControlSource(
- sharedObjects, operator(), topic, guarantee, counts,
Duration.ofMinutes(5));
+ sharedObjects,
+ operator(),
+ topic,
+ guarantee,
+ counts,
+ Duration.ofMillis(50),
+ Duration.ofMinutes(5));
PulsarSink<String> sink =
PulsarSink.builder()
.setServiceUrl(operator().serviceUrl())
@@ -79,8 +85,11 @@ class PulsarSinkITCase extends PulsarTestSuiteBase {
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
env.setParallelism(PARALLELISM);
- env.enableCheckpointing(100L);
+ if (guarantee != DeliveryGuarantee.NONE) {
+ env.enableCheckpointing(500L);
+ }
env.addSource(source).sinkTo(sink);
env.execute();
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
index 942b759..eb766ee 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
@@ -44,7 +44,6 @@ import
org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -65,16 +64,9 @@ class PulsarWriterTest extends PulsarTestSuiteBase {
private static final SinkWriter.Context CONTEXT = new
MockSinkWriterContext();
- @Test
- void writeMessageWithGuarantee() throws Exception {
- writeMessageWithoutGuarantee(EXACTLY_ONCE);
- }
-
@ParameterizedTest
- @EnumSource(
- value = DeliveryGuarantee.class,
- names = {"AT_LEAST_ONCE", "NONE"})
- void writeMessageWithoutGuarantee(DeliveryGuarantee guarantee) throws
Exception {
+ @EnumSource(DeliveryGuarantee.class)
+ void writeMessages(DeliveryGuarantee guarantee) throws Exception {
String topic = randomAlphabetic(10);
operator().createTopic(topic, 8);
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 9a72c8a..f3435b1 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -33,38 +33,15 @@ import
org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.streaming.api.CheckpointingMode;
-import org.junit.jupiter.api.Disabled;
-
/** Unite test class for {@link PulsarSource}. */
@SuppressWarnings("unused")
class PulsarSourceITCase extends SourceTestSuiteBase<String> {
-
- @Disabled // TODO: remove override after FLINK-26177 is fixed
- @Override
- public void testScaleUp(
- TestEnvironment testEnv,
- DataStreamSourceExternalContext<String> externalContext,
- CheckpointingMode semantic)
- throws Exception {
- super.testScaleUp(testEnv, externalContext, semantic);
- }
-
- @Disabled // TODO: remove override after FLINK-26177 is fixed
- @Override
- public void testScaleDown(
- TestEnvironment testEnv,
- DataStreamSourceExternalContext<String> externalContext,
- CheckpointingMode semantic)
- throws Exception {
- super.testScaleDown(testEnv, externalContext, semantic);
- }
-
// Defines test environment on Flink MiniCluster
@TestEnv MiniClusterTestEnvironment flink = new
MiniClusterTestEnvironment();
// Defines pulsar running environment
@TestExternalSystem
- PulsarTestEnvironment pulsar = new
PulsarTestEnvironment(PulsarRuntime.embedded());
+ PulsarTestEnvironment pulsar = new
PulsarTestEnvironment(PulsarRuntime.mock());
@TestSemantics
CheckpointingMode[] semantics = new CheckpointingMode[]
{CheckpointingMode.EXACTLY_ONCE};
@@ -78,4 +55,22 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String>
{
@TestContext
PulsarTestContextFactory<String, MultipleTopicConsumingContext>
multipleTopic =
new PulsarTestContextFactory<>(pulsar,
MultipleTopicConsumingContext::new);
+
+ @Override
+ public void testScaleUp(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<String> externalContext,
+ CheckpointingMode semantic)
+ throws Exception {
+ super.testScaleUp(testEnv, externalContext, semantic);
+ }
+
+ @Override
+ public void testScaleDown(
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<String> externalContext,
+ CheckpointingMode semantic)
+ throws Exception {
+ super.testScaleDown(testEnv, externalContext, semantic);
+ }
}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
index 477dfdd..9806108 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
@@ -28,14 +28,15 @@ import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
-import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.junit.jupiter.api.TestTemplate;
import java.time.Duration;
import java.util.Collections;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
@@ -139,22 +140,6 @@ class PulsarOrderedSourceReaderTest extends
PulsarSourceReaderTestBase {
reader.notifyNoMoreSplits();
}
- private void pollUntilReadExpectedNumberOfRecordsAndValidate(
- PulsarSourceReaderBase<Integer> reader,
- TestingReaderOutput<Integer> output,
- int expectedRecords,
- String topicNameWithPartition)
- throws Exception {
- pollUntil(
- reader,
- output,
- () -> output.getEmittedRecords().size() == expectedRecords,
- "The output didn't poll enough records before timeout.");
- reader.close();
- verifyAllMessageAcknowledged(expectedRecords, topicNameWithPartition);
- assertThat(output.getEmittedRecords()).hasSize(expectedRecords);
- }
-
private void pollUntil(
PulsarSourceReaderBase<Integer> reader,
ReaderOutput<Integer> output,
@@ -177,15 +162,19 @@ class PulsarOrderedSourceReaderTest extends
PulsarSourceReaderTestBase {
}
private void verifyAllMessageAcknowledged(int expectedMessages, String
partitionName)
- throws PulsarAdminException {
- TopicStats topicStats =
operator().admin().topics().getStats(partitionName, true, true);
- // verify if the messages has been consumed
- Map<String, ? extends SubscriptionStats> subscriptionStats =
topicStats.getSubscriptions();
- assertThat(subscriptionStats).hasSizeGreaterThan(0);
- subscriptionStats.forEach(
- (subscription, stats) -> {
- assertThat(stats.getUnackedMessages()).isZero();
-
assertThat(stats.getMsgOutCounter()).isEqualTo(expectedMessages);
- });
+ throws PulsarAdminException, PulsarClientException {
+
+ Consumer<byte[]> consumer =
+ operator()
+ .client()
+ .newConsumer()
+ .subscriptionType(SubscriptionType.Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+ .subscriptionName("verify-message")
+ .topic(partitionName)
+ .subscribe();
+
+ assertThat(((MessageIdImpl) consumer.getLastMessageId()).getEntryId())
+ .isEqualTo(expectedMessages - 1);
}
}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
index a42741d..3819ff6 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
@@ -40,7 +40,6 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -92,11 +91,6 @@ abstract class PulsarSourceReaderTestBase extends
PulsarTestSuiteBase {
operator().setupTopic(topicName, Schema.INT32, () ->
random.nextInt(20));
}
- @AfterEach
- void afterEach(String topicName) {
- operator().deleteTopic(topicName);
- }
-
@TestTemplate
void assignZeroSplitsCreatesZeroSubscription(
PulsarSourceReaderBase<Integer> reader, Boundedness boundedness,
String topicName)
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
index c87140b..b55fdc5 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
@@ -56,7 +56,7 @@ public abstract class PulsarTestSuiteBase {
* pulsar broker. Override this method when needs.
*/
protected PulsarRuntime runtime() {
- return PulsarRuntime.embedded();
+ return PulsarRuntime.mock();
}
/** Operate pulsar by acquiring a runtime operator. */
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
index 6e35027..3684167 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
@@ -29,6 +29,8 @@ import
org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
+import
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles;
+
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
@@ -66,8 +68,10 @@ public class ControlSource extends AbstractRichFunction
String topic,
DeliveryGuarantee guarantee,
int messageCounts,
+ Duration interval,
Duration timeout) {
- MessageGenerator generator = new MessageGenerator(topic, guarantee,
messageCounts);
+ MessageGenerator generator =
+ new MessageGenerator(topic, guarantee, messageCounts,
interval);
StopSignal signal = new StopSignal(operator, topic, messageCounts,
timeout);
this.sharedGenerator = sharedObjects.add(generator);
@@ -134,12 +138,15 @@ public class ControlSource extends AbstractRichFunction
private final DeliveryGuarantee guarantee;
private final int messageCounts;
private final List<String> expectedRecords;
+ private final Duration interval;
- public MessageGenerator(String topic, DeliveryGuarantee guarantee, int
messageCounts) {
+ public MessageGenerator(
+ String topic, DeliveryGuarantee guarantee, int messageCounts,
Duration interval) {
this.topic = topic;
this.guarantee = guarantee;
this.messageCounts = messageCounts;
this.expectedRecords = new ArrayList<>(messageCounts);
+ this.interval = interval;
}
@Override
@@ -158,6 +165,9 @@ public class ControlSource extends AbstractRichFunction
+ "-"
+ randomAlphanumeric(10);
expectedRecords.add(content);
+
+ // Make sure the message was generated in the fixed interval.
+ Uninterruptibles.sleepUninterruptibly(interval);
return content;
}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
index 9c1cd01..92dd94c 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.pulsar.testutils.runtime;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime;
import
org.apache.flink.connector.pulsar.testutils.runtime.embedded.PulsarEmbeddedRuntime;
+import
org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockRuntime;
import org.testcontainers.containers.GenericContainer;
@@ -39,18 +40,23 @@ public interface PulsarRuntime {
void tearDown();
/**
- * Return a operator for operating this pulsar runtime. This operator
predefined a set of
+ * Return an operator for operating this pulsar runtime. This operator
predefined a set of
* extremely useful methods for Pulsar. You can easily add new methods in
this operator.
*/
PulsarRuntimeOperator operator();
+ /** Create a Pulsar instance which would mock all the backends. */
+ static PulsarRuntime mock() {
+ return new PulsarMockRuntime();
+ }
+
/**
- * Create a standalone Pulsar instance in test thread. We would start a
embedded zookeeper and
+ * Create a standalone Pulsar instance in test thread. We would start an
embedded zookeeper and
* bookkeeper. The stream storage for bookkeeper is disabled. The function
worker is disabled on
* Pulsar broker.
*
- * <p>This runtime would be faster than {@link #container()} and behaves
the same like the
- * {@link #container()}.
+ * <p>This runtime would be faster than {@link #container()} and behaves
the same as the {@link
+ * #container()}.
*/
static PulsarRuntime embedded() {
return new PulsarEmbeddedRuntime();
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeUtils.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeUtils.java
new file mode 100644
index 0000000..c2c2e6b
--- /dev/null
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
+
+/** This class is used to create the basic topics for a standalone Pulsar
instance. */
+public final class PulsarRuntimeUtils {
+
+ private PulsarRuntimeUtils() {
+ // No public constructor
+ }
+
+ /** Create the system topics. */
+ public static void initializePulsarEnvironment(
+ ServiceConfiguration config, String serviceUrl, String adminUrl)
+ throws PulsarAdminException, PulsarClientException {
+ try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+ ClusterData clusterData =
+
ClusterData.builder().serviceUrl(adminUrl).brokerServiceUrl(serviceUrl).build();
+ String cluster = config.getClusterName();
+ createSampleNameSpace(admin, clusterData, cluster);
+
+ // Create default namespace
+ createNameSpace(
+ admin,
+ cluster,
+ TopicName.PUBLIC_TENANT,
+ TopicName.PUBLIC_TENANT + "/" +
TopicName.DEFAULT_NAMESPACE);
+
+ // Create Pulsar system namespace
+ createNameSpace(
+ admin, cluster, SYSTEM_NAMESPACE.getTenant(),
SYSTEM_NAMESPACE.toString());
+ // Enable transaction
+ if (config.isTransactionCoordinatorEnabled()
+ && !admin.namespaces()
+ .getTopics(SYSTEM_NAMESPACE.toString())
+
.contains(TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())) {
+
admin.topics().createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+ }
+ }
+ }
+
+ private static void createSampleNameSpace(
+ PulsarAdmin admin, ClusterData clusterData, String cluster)
+ throws PulsarAdminException {
+ // Create a sample namespace
+ String tenant = "sample";
+ String globalCluster = "global";
+ String namespace = tenant + "/ns1";
+
+ List<String> clusters = admin.clusters().getClusters();
+ if (!clusters.contains(cluster)) {
+ admin.clusters().createCluster(cluster, clusterData);
+ } else {
+ admin.clusters().updateCluster(cluster, clusterData);
+ }
+ // Create marker for "global" cluster
+ if (!clusters.contains(globalCluster)) {
+ admin.clusters().createCluster(globalCluster,
ClusterData.builder().build());
+ }
+
+ if (!admin.tenants().getTenants().contains(tenant)) {
+ admin.tenants()
+ .createTenant(
+ tenant,
+ new TenantInfoImpl(
+ Collections.emptySet(),
Collections.singleton(cluster)));
+ }
+
+ if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) {
+ admin.namespaces().createNamespace(namespace);
+ }
+ }
+
+ private static void createNameSpace(
+ PulsarAdmin admin, String cluster, String publicTenant, String
defaultNamespace)
+ throws PulsarAdminException {
+ if (!admin.tenants().getTenants().contains(publicTenant)) {
+ admin.tenants()
+ .createTenant(
+ publicTenant,
+ TenantInfo.builder()
+ .adminRoles(Collections.emptySet())
+
.allowedClusters(Collections.singleton(cluster))
+ .build());
+ }
+ if
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
+ admin.namespaces().createNamespace(defaultNamespace);
+ admin.namespaces()
+ .setNamespaceReplicationClusters(
+ defaultNamespace, Collections.singleton(cluster));
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
index cf080b8..07981cc 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
@@ -25,13 +25,7 @@ import org.apache.flink.util.FileUtils;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,15 +37,12 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.List;
import java.util.Optional;
+import static
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeUtils.initializePulsarEnvironment;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.ServiceConfigurationUtils.brokerUrl;
import static org.apache.pulsar.broker.ServiceConfigurationUtils.webServiceUrl;
-import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
-import static
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
/** Providing a embedded pulsar server. We use this runtime for transaction
related tests. */
public class PulsarEmbeddedRuntime implements PulsarRuntime {
@@ -84,7 +75,7 @@ public class PulsarEmbeddedRuntime implements PulsarRuntime {
startPulsarService();
// Create the operator.
- this.operator = new PulsarRuntimeOperator(getBrokerUrl(),
getWebServiceUrl());
+ this.operator = new PulsarRuntimeOperator(serviceUrl(),
adminUrl());
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -175,99 +166,20 @@ public class PulsarEmbeddedRuntime implements
PulsarRuntime {
pulsarService.start();
// Create sample data environment.
- String webServiceUrl = getWebServiceUrl();
- String brokerUrl = getBrokerUrl();
- try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(webServiceUrl).build()) {
- ClusterData clusterData =
- ClusterData.builder()
- .serviceUrl(webServiceUrl)
- .brokerServiceUrl(brokerUrl)
- .build();
- String cluster = config.getClusterName();
- createSampleNameSpace(admin, clusterData, cluster);
-
- // Create default namespace
- createNameSpace(
- admin,
- cluster,
- TopicName.PUBLIC_TENANT,
- TopicName.PUBLIC_TENANT + "/" +
TopicName.DEFAULT_NAMESPACE);
-
- // Create Pulsar system namespace
- createNameSpace(
- admin, cluster, SYSTEM_NAMESPACE.getTenant(),
SYSTEM_NAMESPACE.toString());
- // Enable transaction
- if (config.isTransactionCoordinatorEnabled()
- && !admin.namespaces()
- .getTopics(SYSTEM_NAMESPACE.toString())
-
.contains(TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())) {
-
admin.topics().createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
- }
- }
+ initializePulsarEnvironment(config, serviceUrl(), adminUrl());
}
private int getZkPort() {
return checkNotNull(bookkeeper).getZookeeperPort();
}
- private String getBrokerUrl() {
+ private String serviceUrl() {
Integer port =
pulsarService.getBrokerListenPort().orElseThrow(IllegalStateException::new);
return brokerUrl("127.0.0.1", port);
}
- private String getWebServiceUrl() {
+ private String adminUrl() {
Integer port =
pulsarService.getListenPortHTTP().orElseThrow(IllegalArgumentException::new);
return webServiceUrl("127.0.0.1", port);
}
-
- private void createSampleNameSpace(PulsarAdmin admin, ClusterData
clusterData, String cluster)
- throws PulsarAdminException {
- // Create a sample namespace
- String tenant = "sample";
- String globalCluster = "global";
- String namespace = tenant + "/ns1";
-
- List<String> clusters = admin.clusters().getClusters();
- if (!clusters.contains(cluster)) {
- admin.clusters().createCluster(cluster, clusterData);
- } else {
- admin.clusters().updateCluster(cluster, clusterData);
- }
- // Create marker for "global" cluster
- if (!clusters.contains(globalCluster)) {
- admin.clusters().createCluster(globalCluster,
ClusterData.builder().build());
- }
-
- if (!admin.tenants().getTenants().contains(tenant)) {
- admin.tenants()
- .createTenant(
- tenant,
- new TenantInfoImpl(
- Collections.emptySet(),
Collections.singleton(cluster)));
- }
-
- if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) {
- admin.namespaces().createNamespace(namespace);
- }
- }
-
- private void createNameSpace(
- PulsarAdmin admin, String cluster, String publicTenant, String
defaultNamespace)
- throws PulsarAdminException {
- if (!admin.tenants().getTenants().contains(publicTenant)) {
- admin.tenants()
- .createTenant(
- publicTenant,
- TenantInfo.builder()
- .adminRoles(Collections.emptySet())
-
.allowedClusters(Collections.singleton(cluster))
- .build());
- }
- if
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
- admin.namespaces().createNamespace(defaultNamespace);
- admin.namespaces()
- .setNamespaceReplicationClusters(
- defaultNamespace, Collections.singleton(cluster));
- }
- }
}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/BlankBrokerInterceptor.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/BlankBrokerInterceptor.java
new file mode 100644
index 0000000..8355a23
--- /dev/null
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/BlankBrokerInterceptor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+
+/** No operation for this BrokerInterceptor implementation. */
+public class BlankBrokerInterceptor implements BrokerInterceptor {
+
+ @Override
+ public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
+ // no-op
+ }
+
+ @Override
+ public void onConnectionClosed(ServerCnx cnx) {
+ // no-op
+ }
+
+ @Override
+ public void onWebserviceRequest(ServletRequest request) {
+ // no-op
+ }
+
+ @Override
+ public void onWebserviceResponse(ServletRequest request, ServletResponse
response) {
+ // no-op
+ }
+
+ @Override
+ public void initialize(PulsarService pulsarService) {
+ // no-op
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockBookKeeperClientFactory.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockBookKeeperClientFactory.java
new file mode 100644
index 0000000..41fad54
--- /dev/null
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockBookKeeperClientFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import io.netty.channel.EventLoopGroup;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+/** A BookKeeperClientFactory implementation which returns a mocked
bookkeeper. */
+public class MockBookKeeperClientFactory implements BookKeeperClientFactory {
+
+ private final OrderedExecutor executor =
+
OrderedExecutor.newBuilder().numThreads(1).name("mock-pulsar-bookkeeper").build();
+
+ private final BookKeeper bookKeeper =
NonClosableMockBookKeeper.create(executor);
+
+ @Override
+ public BookKeeper create(
+ ServiceConfiguration conf,
+ ZooKeeper zkClient,
+ EventLoopGroup eventLoopGroup,
+ Optional<Class<? extends EnsemblePlacementPolicy>>
ensemblePlacementPolicyClass,
+ Map<String, Object> ensemblePlacementPolicyProperties)
+ throws IOException {
+ return bookKeeper;
+ }
+
+ @Override
+ public BookKeeper create(
+ ServiceConfiguration conf,
+ ZooKeeper zkClient,
+ EventLoopGroup eventLoopGroup,
+ Optional<Class<? extends EnsemblePlacementPolicy>>
ensemblePlacementPolicyClass,
+ Map<String, Object> ensemblePlacementPolicyProperties,
+ StatsLogger statsLogger)
+ throws IOException {
+ return bookKeeper;
+ }
+
+ @Override
+ public void close() {
+ try {
+ bookKeeper.close();
+ executor.shutdown();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockPulsarService.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockPulsarService.java
new file mode 100644
index 0000000..6b6c412
--- /dev/null
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockPulsarService.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.zookeeper.MockZooKeeperSession;
+
+import java.util.function.Supplier;
+
+/** A Mock pulsar service which would use the mocked zookeeper and bookkeeper.
*/
+public class MockPulsarService extends PulsarService {
+
+ private final int brokerServicePort;
+
+ private final MockZooKeeperClientFactory zooKeeperClientFactory =
+ new MockZooKeeperClientFactory();
+
+ private final MockZooKeeperSession zooKeeperSession =
+
MockZooKeeperSession.newInstance(zooKeeperClientFactory.getZooKeeper());
+
+ private final SameThreadOrderedSafeExecutor orderedExecutor =
+ new SameThreadOrderedSafeExecutor();
+
+ public MockPulsarService(ServiceConfiguration config) {
+ super(config);
+ this.brokerServicePort =
+
config.getBrokerServicePort().orElseThrow(IllegalArgumentException::new);
+ }
+
+ public ZooKeeperClientFactory getZooKeeperClientFactory() {
+ return zooKeeperClientFactory;
+ }
+
+ public BookKeeperClientFactory newBookKeeperClientFactory() {
+ return new MockBookKeeperClientFactory();
+ }
+
+ public MetadataStoreExtended createLocalMetadataStore() {
+ return new ZKMetadataStore(zooKeeperSession);
+ }
+
+ public MetadataStoreExtended createConfigurationMetadataStore() {
+ return new ZKMetadataStore(zooKeeperSession);
+ }
+
+ public Supplier<NamespaceService> getNamespaceServiceProvider() {
+ return () -> new NamespaceService(this);
+ }
+
+ @Override
+ public OrderedExecutor getOrderedExecutor() {
+ return orderedExecutor;
+ }
+
+ @Override
+ public BrokerInterceptor getBrokerInterceptor() {
+ return new BlankBrokerInterceptor();
+ }
+
+ public int getBrokerServicePort() {
+ return brokerServicePort;
+ }
+}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockZooKeeperClientFactory.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockZooKeeperClientFactory.java
new file mode 100644
index 0000000..3c89484
--- /dev/null
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/MockZooKeeperClientFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl.ENCODING_SCHEME;
+import static org.apache.zookeeper.CreateMode.PERSISTENT;
+
+/** A ZooKeeperClientFactory implementation which returns mocked zookeeper
instead of normal zk. */
+public class MockZooKeeperClientFactory implements ZooKeeperClientFactory {
+
+ private final MockZooKeeper zooKeeper;
+
+ public MockZooKeeperClientFactory() {
+ this.zooKeeper =
MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+ List<ACL> dummyAclList = new ArrayList<>(0);
+
+ try {
+ ZkUtils.createFullPathOptimistic(
+ zooKeeper,
+ "/ledgers/available/192.168.1.1:" + 5000,
+ "".getBytes(ENCODING_SCHEME),
+ dummyAclList,
+ PERSISTENT);
+
+ zooKeeper.create(
+ "/ledgers/LAYOUT",
+ "1\nflat:1".getBytes(ENCODING_SCHEME),
+ dummyAclList,
+ PERSISTENT);
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<ZooKeeper> create(
+ String serverList, SessionType sessionType, int
zkSessionTimeoutMillis) {
+ return CompletableFuture.completedFuture(zooKeeper);
+ }
+
+ MockZooKeeper getZooKeeper() {
+ return zooKeeper;
+ }
+}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/NonClosableMockBookKeeper.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/NonClosableMockBookKeeper.java
new file mode 100644
index 0000000..b7001b8
--- /dev/null
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/NonClosableMockBookKeeper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+
+/**
+ * Prevent the MockBookKeeper instance from being closed when the broker is
restarted within a test.
+ */
+public class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
+
+ private NonClosableMockBookKeeper(OrderedExecutor executor) throws
Exception {
+ super(executor);
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+
+ @Override
+ public void shutdown() {
+ // no-op
+ }
+
+ public void reallyShutdown() {
+ super.shutdown();
+ }
+
+ public static BookKeeper create(OrderedExecutor executor) {
+ try {
+ return new NonClosableMockBookKeeper(executor);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
new file mode 100644
index 0000000..e6ff060
--- /dev/null
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
+import
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+
+import java.util.Optional;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeUtils.initializePulsarEnvironment;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Providing a mocked pulsar server. */
+public class PulsarMockRuntime implements PulsarRuntime {
+
+ private static final String CLUSTER_NAME = "mock-pulsar-" +
randomAlphanumeric(6);
+ private final ServiceConfiguration configuration;
+ private final MockPulsarService pulsarService;
+ private PulsarRuntimeOperator operator;
+
+ public PulsarMockRuntime() {
+ this(createConfig());
+ }
+
+ public PulsarMockRuntime(ServiceConfiguration configuration) {
+ this.configuration = configuration;
+ this.pulsarService = new MockPulsarService(configuration);
+ }
+
+ @Override
+ public void startUp() {
+ try {
+ pulsarService.start();
+
+ String serviceUrl = pulsarService.getBrokerServiceUrl();
+ String adminUrl = pulsarService.getWebServiceAddress();
+ initializePulsarEnvironment(configuration, serviceUrl, adminUrl);
+
+ this.operator = new PulsarRuntimeOperator(serviceUrl, adminUrl);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void tearDown() {
+ try {
+ pulsarService.close();
+ operator.close();
+ this.operator = null;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public PulsarRuntimeOperator operator() {
+ return checkNotNull(operator, "You should start this mock pulsar
first.");
+ }
+
+ private static ServiceConfiguration createConfig() {
+ ServiceConfiguration configuration = new ServiceConfiguration();
+
+ configuration.setAdvertisedAddress("localhost");
+ configuration.setClusterName(CLUSTER_NAME);
+
+ configuration.setManagedLedgerCacheSizeMB(8);
+ configuration.setActiveConsumerFailoverDelayTimeMillis(0);
+ configuration.setDefaultRetentionTimeInMinutes(7);
+ configuration.setDefaultNumberOfNamespaceBundles(1);
+ configuration.setZookeeperServers("localhost:2181");
+ configuration.setConfigurationStoreServers("localhost:3181");
+
+ configuration.setAuthenticationEnabled(false);
+ configuration.setAuthorizationEnabled(false);
+ configuration.setAllowAutoTopicCreation(true);
+ configuration.setBrokerDeleteInactiveTopicsEnabled(false);
+
+ configuration.setWebSocketServiceEnabled(false);
+ // Use runtime dynamic ports
+ configuration.setBrokerServicePort(Optional.of(0));
+ configuration.setWebServicePort(Optional.of(0));
+
+ // Enable transactions.
+ configuration.setTransactionCoordinatorEnabled(true);
+ configuration.setTransactionMetadataStoreProviderClassName(
+
"org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider");
+
+ return configuration;
+ }
+}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/SameThreadOrderedSafeExecutor.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/SameThreadOrderedSafeExecutor.java
new file mode 100644
index 0000000..9667f08
--- /dev/null
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/SameThreadOrderedSafeExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
+
+/** Override the default bookkeeper executor for executing in one thread
executor. */
+public class SameThreadOrderedSafeExecutor extends OrderedExecutor {
+
+ public SameThreadOrderedSafeExecutor() {
+ super(
+ "same-thread-executor",
+ 1,
+ new DefaultThreadFactory("test"),
+ NullStatsLogger.INSTANCE,
+ false,
+ false,
+ 100000,
+ -1,
+ false);
+ }
+
+ @Override
+ public void execute(Runnable r) {
+ r.run();
+ }
+
+ @Override
+ public void executeOrdered(int orderingKey, SafeRunnable r) {
+ r.run();
+ }
+
+ @Override
+ public void executeOrdered(long orderingKey, SafeRunnable r) {
+ r.run();
+ }
+}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
b/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
index bf35c59..10437a717 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
+++
b/flink-connectors/flink-connector-pulsar/src/test/resources/containers/txnStandalone.conf
@@ -1015,7 +1015,7 @@ defaultNumPartitions=1
### --- Transaction config variables --- ###
# Enable transaction coordinator in broker
transactionCoordinatorEnabled=true
-transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider
+transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider
# Transaction buffer take snapshot transaction count
transactionBufferSnapshotMaxTransactionCount=1000