This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19e6546  Fix flaky MessageIdTest and introduce some testing 
improvements (#9286)
19e6546 is described below

commit 19e6546a3d86169b75b122b2a4b4d1884133a4f3
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 27 01:38:54 2021 +0200

    Fix flaky MessageIdTest and introduce some testing improvements (#9286)
    
    * Refactor PulsarClient initialization and lifecycle management in tests
    
    * Add getter and setter to access remoteEndpointProtocolVersion field
    
    - it makes it easier to override for tests
    
    * Add hooks for overriding the producer implementation in PulsarClientImpl
    
    - useful for tests. Instead of relying on Mockito, there's a pure Java
      way to inject behavior to producer implementations for testing purposes
    
    * Introduce PulsarTestClient that contains ways to prevent race conditions 
and test flakiness
    
    - provides features for simulating failure conditions, for example
      the case of the broker connection disconnecting
    
    * Add solution for using Enums classes as source for TestNG DataProvider
    
    * Fix flaky MessageIdTest and move checksum related tests to new class
    
    * Fix NPE in PartitionedProducerImplTest
---
 .../pulsar/tests/EnumValuesDataProvider.java       |  52 +++
 .../pulsar/tests/EnumValuesDataProviderTest.java   |  69 +++
 .../authentication/SaslAuthenticateTest.java       |   4 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  11 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  77 ++--
 .../api/AuthenticatedProducerConsumerTest.java     |   9 +-
 .../AuthenticationTlsHostnameVerificationTest.java |   5 +-
 .../api/AuthorizationProducerConsumerTest.java     |  12 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java |  11 +-
 .../client/api/MutualAuthenticationTest.java       |  16 +-
 ...MultiListenersWithInternalListenerNameTest.java |   5 +-
 ...tiListenersWithoutInternalListenerNameTest.java |   5 +-
 .../pulsar/client/api/TlsProducerConsumerBase.java |   3 +-
 .../TokenAuthenticatedProducerConsumerTest.java    |   5 +-
 ...kenOauth2AuthenticatedProducerConsumerTest.java |   5 +-
 .../KeyStoreTlsProducerConsumerTestWithAuth.java   |   2 +-
 ...KeyStoreTlsProducerConsumerTestWithoutAuth.java |   2 +-
 .../pulsar/client/impl/MessageChecksumTest.java    | 249 +++++++++++
 .../apache/pulsar/client/impl/MessageIdTest.java   | 485 +++------------------
 .../pulsar/client/impl/PulsarTestClient.java       | 218 +++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   2 +-
 .../client/impl/PartitionedProducerImpl.java       |   5 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  28 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  57 ++-
 .../client/impl/PartitionedProducerImplTest.java   |  31 +-
 .../pulsar/common/protocol/PulsarHandler.java      |   8 +-
 .../pulsar/discovery/service/ServerConnection.java |   2 +-
 .../pulsar/proxy/server/ProxyConnection.java       |   6 +-
 28 files changed, 845 insertions(+), 539 deletions(-)

diff --git 
a/buildtools/src/main/java/org/apache/pulsar/tests/EnumValuesDataProvider.java 
b/buildtools/src/main/java/org/apache/pulsar/tests/EnumValuesDataProvider.java
new file mode 100644
index 0000000..3c43bae
--- /dev/null
+++ 
b/buildtools/src/main/java/org/apache/pulsar/tests/EnumValuesDataProvider.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.testng.annotations.DataProvider;
+
+/**
+ * TestNG DataProvider for passing all Enum values as parameters to a test 
method.
+ *
+ * Supports currently a single Enum parameter for a test method.
+ */
+public abstract class EnumValuesDataProvider {
+    @DataProvider
+    public static final Object[][] values(Method testMethod) {
+        Class<?> enumClass = Arrays.stream(testMethod.getParameterTypes())
+                .findFirst()
+                .filter(Class::isEnum)
+                .orElseThrow(() -> new IllegalArgumentException("The test 
method should have an enum parameter."));
+        return toDataProviderArray((Class<? extends Enum<?>>) enumClass);
+    }
+
+    /*
+     * Converts all values of an Enum class to a TestNG DataProvider object 
array
+     */
+    public static Object[][] toDataProviderArray(Class<? extends Enum<?>> 
enumClass) {
+        Enum<?>[] enumValues = enumClass.getEnumConstants();
+        return Stream.of(enumValues)
+                .map(enumValue -> new Object[]{enumValue})
+                .collect(Collectors.toList())
+                .toArray(new Object[0][]);
+    }
+}
diff --git 
a/buildtools/src/test/java/org/apache/pulsar/tests/EnumValuesDataProviderTest.java
 
b/buildtools/src/test/java/org/apache/pulsar/tests/EnumValuesDataProviderTest.java
new file mode 100644
index 0000000..9eb24cc
--- /dev/null
+++ 
b/buildtools/src/test/java/org/apache/pulsar/tests/EnumValuesDataProviderTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import static org.testng.Assert.assertEquals;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.testng.annotations.Test;
+
+public class EnumValuesDataProviderTest {
+    enum Sample {
+        A, B, C
+    }
+
+    @Test(dataProviderClass = EnumValuesDataProvider.class, dataProvider = 
"values")
+    void testEnumValuesProvider(Sample sample) {
+        System.out.println(sample);
+    }
+
+    @Test
+    void shouldContainAllEnumValues() {
+        
verifyTestParameters(EnumValuesDataProvider.toDataProviderArray(Sample.class));
+    }
+
+    @Test
+    void shouldDetermineEnumValuesFromMethod() {
+        Method testMethod = Arrays.stream(getClass().getDeclaredMethods())
+                .filter(method -> 
method.getName().equals("testEnumValuesProvider"))
+                .findFirst()
+                .get();
+        verifyTestParameters(EnumValuesDataProvider.values(testMethod));
+    }
+
+    private void verifyTestParameters(Object[][] testParameters) {
+        Set<Sample> enumValuesFromDataProvider = Arrays.stream(testParameters)
+                .map(element -> element[0])
+                .map(Sample.class::cast)
+                .collect(Collectors.toSet());
+        assertEquals(enumValuesFromDataProvider, new 
HashSet<>(Arrays.asList(Sample.values())));
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    void shouldFailIfEnumParameterIsMissing() {
+        Method testMethod = Arrays.stream(getClass().getDeclaredMethods())
+                .filter(method -> 
method.getName().equals("shouldFailIfEnumParameterIsMissing"))
+                .findFirst()
+                .get();
+        EnumValuesDataProvider.values(testMethod);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index bb20077..6fa4b50 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -181,10 +181,10 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
 
         lookupUrl = new URI(pulsar.getWebServiceAddress());
 
-        pulsarClient = PulsarClient.builder()
+        replacePulsarClient(PulsarClient.builder()
             .serviceUrl(lookupUrl.toString())
             .statsInterval(0, TimeUnit.SECONDS)
-            .authentication(authSasl).build();
+            .authentication(authSasl));
 
         // set admin auth, to verify admin web resources
         Map<String, String> clientSaslConfig = Maps.newHashMap();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 09980fd..55d6a01 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -552,7 +552,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     private void completeConnect(int clientProtoVersion, String clientVersion) 
{
         ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize));
         state = State.Connected;
-        remoteEndpointProtocolVersion = clientProtoVersion;
+        setRemoteEndpointProtocolVersion(clientProtoVersion);
         if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* 
ignore default version: pulsar client */) {
             this.clientVersion = clientVersion.intern();
         }
@@ -662,7 +662,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             try {
                 AuthData brokerData = authState.refreshAuthentication();
 
-                ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, 
brokerData, remoteEndpointProtocolVersion));
+                ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, 
brokerData,
+                        getRemoteEndpointProtocolVersion()));
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Sent auth challenge to client to refresh 
credentials with method: {}.",
                         remoteAddress, authMethod);
@@ -1958,7 +1959,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     public void closeProducer(Producer producer) {
         // removes producer-connection from map and send close command to 
producer
         safelyRemoveProducer(producer);
-        if (remoteEndpointProtocolVersion >= v5.getValue()) {
+        if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
             
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
         } else {
             close();
@@ -1970,7 +1971,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     public void closeConsumer(Consumer consumer) {
         // removes consumer-connection from map and send close command to 
consumer
         safelyRemoveConsumer(consumer);
-        if (remoteEndpointProtocolVersion >= v5.getValue()) {
+        if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
             ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), 
-1L));
         } else {
             close();
@@ -2224,7 +2225,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     @Override
     public boolean isBatchMessageCompatibleVersion() {
-        return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getValue();
+        return getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v4.getValue();
     }
 
     boolean supportsAuthenticationRefresh() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 47c8fe3..4ad5145 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -21,11 +21,9 @@ package org.apache.pulsar.broker.auth;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
@@ -40,7 +38,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
-
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
@@ -53,6 +50,7 @@ import 
org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -69,7 +67,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Base class for all tests that need a Pulsar instance without a ZK and BK 
cluster
+ * Base class for all tests that need a Pulsar instance without a ZK and BK 
cluster.
  */
 @PowerMockIgnore(value = {"org.slf4j.*", "com.sun.org.apache.xerces.*" })
 public abstract class MockedPulsarServiceBaseTest {
@@ -117,17 +115,29 @@ public abstract class MockedPulsarServiceBaseTest {
         internalSetup();
     }
 
-    protected final void internalSetup(boolean isPreciseDispatcherFlowControl) 
throws Exception {
-        init(isPreciseDispatcherFlowControl);
-        lookupUrl = new URI(brokerUrl.toString());
-        if (isTcpLookup) {
-            lookupUrl = new URI(pulsar.getBrokerServiceUrl());
-        }
-        pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+    protected PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        ClientBuilder clientBuilder =
+                PulsarClient.builder()
+                        .serviceUrl(url)
+                        .statsInterval(intervalInSecs, TimeUnit.SECONDS);
+        customizeNewPulsarClientBuilder(clientBuilder);
+        return createNewPulsarClient(clientBuilder);
     }
 
-    protected PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
-        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
+
+    }
+
+    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) 
throws PulsarClientException {
+        return clientBuilder.build();
+    }
+
+    protected PulsarClient replacePulsarClient(ClientBuilder clientBuilder) 
throws PulsarClientException {
+        if (pulsarClient != null) {
+            pulsarClient.shutdown();
+        }
+        pulsarClient = createNewPulsarClient(clientBuilder);
+        return pulsarClient;
     }
 
     protected final void internalSetupForStatsTest() throws Exception {
@@ -163,27 +173,6 @@ public abstract class MockedPulsarServiceBaseTest {
         startBroker();
     }
 
-    protected final void init(boolean isPreciseDispatcherFlowControl) throws 
Exception {
-        this.conf.setBrokerServicePort(Optional.of(0));
-        this.conf.setBrokerServicePortTls(Optional.of(0));
-        this.conf.setAdvertisedAddress("localhost");
-        this.conf.setWebServicePort(Optional.of(0));
-        this.conf.setWebServicePortTls(Optional.of(0));
-        
this.conf.setPreciseDispatcherFlowControl(isPreciseDispatcherFlowControl);
-        this.conf.setNumExecutorThreadPoolSize(5);
-
-        sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
-        bkExecutor = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
-                .setUncaughtExceptionHandler((thread, ex) -> 
log.info("Uncaught exception", ex))
-                .build());
-
-        mockZooKeeper = createMockZooKeeper();
-        mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);
-
-        startBroker();
-    }
-
     protected final void internalCleanup() throws Exception {
         // if init fails, some of these could be null, and if so would throw
         // an NPE in shutdown, obscuring the real error
@@ -196,7 +185,7 @@ public abstract class MockedPulsarServiceBaseTest {
             pulsarClient = null;
         }
         if (pulsar != null) {
-            pulsar.close();
+            stopBroker();
             pulsar = null;
         }
         if (mockBookKeeper != null) {
@@ -244,6 +233,8 @@ public abstract class MockedPulsarServiceBaseTest {
     }
 
     protected void stopBroker() throws Exception {
+        log.info("Stopping Pulsar broker. brokerServiceUrl: {} 
webServiceAddress: {}", pulsar.getBrokerServiceUrl(),
+                pulsar.getWebServiceAddress());
         pulsar.close();
         pulsar = null;
         // Simulate cleanup of ephemeral nodes
@@ -274,6 +265,8 @@ public abstract class MockedPulsarServiceBaseTest {
         conf.setAuthorizationEnabled(true);
         pulsar.start();
         conf.setAuthorizationEnabled(isAuthorizationEnabled);
+        log.info("Pulsar started. brokerServiceUrl: {} webServiceAddress: {}", 
pulsar.getBrokerServiceUrl(),
+                pulsar.getWebServiceAddress());
 
         return pulsar;
     }
@@ -363,9 +356,9 @@ public abstract class MockedPulsarServiceBaseTest {
         public CompletableFuture<ZooKeeper> create(String serverList, 
SessionType sessionType,
                 int zkSessionTimeoutMillis) {
 
-            if (serverList != null &&
-                    
(serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
-                            || 
serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
+            if (serverList != null
+                    && 
(serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
+                    || serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
                 return CompletableFuture.completedFuture(mockZooKeeperGlobal);
             }
 
@@ -408,7 +401,8 @@ public abstract class MockedPulsarServiceBaseTest {
         return false;
     }
 
-    public static void setFieldValue(Class<?> clazz, Object classObj, String 
fieldName, Object fieldValue) throws Exception {
+    public static void setFieldValue(Class<?> clazz, Object classObj, String 
fieldName,
+                                     Object fieldValue) throws Exception {
         Field field = clazz.getDeclaredField(fieldName);
         field.setAccessible(true);
         field.set(classObj, fieldValue);
@@ -418,7 +412,8 @@ public abstract class MockedPulsarServiceBaseTest {
         ServiceConfiguration configuration = new ServiceConfiguration();
         configuration.setAdvertisedAddress("localhost");
         configuration.setClusterName(configClusterName);
-        configuration.setAdvertisedAddress("localhost"); // there are TLS 
tests in here, they need to use localhost because of the certificate
+        // there are TLS tests in here, they need to use localhost because of 
the certificate
+        configuration.setAdvertisedAddress("localhost");
         configuration.setManagedLedgerCacheSizeMB(8);
         configuration.setActiveConsumerFailoverDelayTimeMillis(0);
         configuration.setDefaultNumberOfNamespaceBundles(1);
@@ -435,4 +430,4 @@ public abstract class MockedPulsarServiceBaseTest {
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 7f3e7b3..1f85c99 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -111,9 +111,9 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
         } else {
             lookupUrl = pulsar.getBrokerServiceUrlTls();
         }
-        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
+        
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0,
 TimeUnit.SECONDS)
                 
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true).authentication(auth)
-                .enableTls(true).build();
+                .enableTls(true));
     }
 
     @AfterMethod(alwaysRun = true)
@@ -241,9 +241,8 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
                 EnumSet.allOf(AuthAction.class));
 
         // setup the client
-        pulsarClient.close();
-        pulsarClient = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
-                .operationTimeout(1, TimeUnit.SECONDS).build();
+        
replacePulsarClient(PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
+                .operationTimeout(1, TimeUnit.SECONDS));
 
         // unauthorized topic test
         Exception pulsarClientException = null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index cf04a4f..a0393bd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -109,12 +109,11 @@ public class AuthenticationTlsHostnameVerificationTest 
extends ProducerConsumerB
         admin = 
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
                 
.tlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true)
                 .authentication(authTls).build());
-        pulsarClient = PulsarClient.builder()
+        replacePulsarClient(PulsarClient.builder()
                 .serviceUrl(pulsar.getBrokerServiceUrlTls())
                 .statsInterval(0, TimeUnit.SECONDS)
                 
.tlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true)
-                
.authentication(authTls).enableTls(true).enableTlsHostnameVerification(hostnameVerificationEnabled)
-                .build();
+                
.authentication(authTls).enableTls(true).enableTlsHostnameVerification(hostnameVerificationEnabled));
 
         admin.clusters().createCluster("test", new 
ClusterData(brokerUrl.toString()));
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 933ad88..7c06837 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -197,10 +197,10 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subscriptionRole,
                 Collections.singleton(AuthAction.consume));
 
-        pulsarClient = PulsarClient.builder()
+        replacePulsarClient(PulsarClient.builder()
                 .serviceUrl(pulsar.getBrokerServiceUrl())
-                .authentication(authentication)
-                .build();
+                .authentication(authentication));
+
         // (1) Create subscription name
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .subscribe();
@@ -271,10 +271,10 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
 
         Authentication authentication = new ClientAuthentication(clientRole);
 
-        pulsarClient = PulsarClient.builder()
+        replacePulsarClient(PulsarClient.builder()
                 .serviceUrl(pulsar.getBrokerServiceUrl())
-                .authentication(authentication)
-                .build();
+                .authentication(authentication));
+
 
         admin.clusters().createCluster("test", new 
ClusterData(brokerUrl.toString()));
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index dfa1c42..e43b167 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -112,18 +112,15 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
     @Override
     protected void setup() throws Exception {
         conf.setDefaultNumberOfNamespaceBundles(1);
-        super.init();
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(pulsar.getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .build();
-        super.producerBaseSetup();
+        isTcpLookup = true;
+        internalSetup();
+        producerBaseSetup();
     }
 
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
-        super.internalCleanup();
+        internalCleanup();
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 458c4ae..2c01d29 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -194,18 +194,20 @@ public class MutualAuthenticationTest extends 
ProducerConsumerBase {
         Set<String> providersClassNames = 
Sets.newHashSet(MutualAuthenticationProvider.class.getName());
         conf.setAuthenticationProviders(providersClassNames);
 
-        super.init();
-        pulsarClient = PulsarClient.builder()
-                .serviceUrl(pulsar.getBrokerServiceUrl())
-                .authentication(mutualAuth)
-                .build();
-        super.producerBaseSetup();
+        isTcpLookup = true;
+        internalSetup();
+        producerBaseSetup();
+    }
+
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
+        clientBuilder.authentication(mutualAuth);
     }
 
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
-        super.internalCleanup();
+        internalCleanup();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
index bd8adcd..dccd67e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
@@ -64,8 +64,9 @@ public class PulsarMultiListenersWithInternalListenerNameTest 
extends MockedPuls
         this.conf.setInternalListenerName("internal");
     }
 
-    protected PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
-        return 
PulsarClient.builder().serviceUrl(url).listenerName("internal").statsInterval(intervalInSecs,
 TimeUnit.SECONDS).build();
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
+        clientBuilder.listenerName("internal");
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
index f5ecc29..baaa4b7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
@@ -63,8 +63,9 @@ public class 
PulsarMultiListenersWithoutInternalListenerNameTest extends MockedP
         
this.conf.setAdvertisedListeners(String.format("internal:pulsar://%s:6650,internal:pulsar+ssl://%s:6651",
 host, host));
     }
 
-    protected PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
-        return 
PulsarClient.builder().serviceUrl(url).listenerName("internal").statsInterval(intervalInSecs,
 TimeUnit.SECONDS).build();
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
+        clientBuilder.listenerName("internal");
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
index b91a5f0..9ec369a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
@@ -77,7 +77,6 @@ public class TlsProducerConsumerBase extends 
ProducerConsumerBase {
         if (pulsarClient != null) {
             pulsarClient.close();
         }
-
         ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(lookupUrl)
                 
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
                 .operationTimeout(1000, TimeUnit.MILLISECONDS);
@@ -87,7 +86,7 @@ public class TlsProducerConsumerBase extends 
ProducerConsumerBase {
             authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
             clientBuilder.authentication(AuthenticationTls.class.getName(), 
authParams);
         }
-        pulsarClient = clientBuilder.build();
+        replacePulsarClient(clientBuilder);
     }
 
     protected void internalSetUpForNamespace() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index 49b3c7c..c94b3e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -80,10 +80,9 @@ public class TokenAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase
                 .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
                 .build());
 
-        pulsarClient = PulsarClient.builder().serviceUrl(new 
URI(pulsar.getBrokerServiceUrl()).toString())
+        replacePulsarClient(PulsarClient.builder().serviceUrl(new 
URI(pulsar.getBrokerServiceUrl()).toString())
                 .statsInterval(0, TimeUnit.SECONDS)
-                .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
-                .build();
+                .authentication(AuthenticationFactory.token(ADMIN_TOKEN)));
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index b4e7bb0..d0b6349 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -102,10 +102,9 @@ public class TokenOauth2AuthenticatedProducerConsumerTest 
extends ProducerConsum
                 .authentication(authentication)
                 .build());
 
-        pulsarClient = PulsarClient.builder().serviceUrl(new 
URI(pulsar.getBrokerServiceUrl()).toString())
+        replacePulsarClient(PulsarClient.builder().serviceUrl(new 
URI(pulsar.getBrokerServiceUrl()).toString())
                 .statsInterval(0, TimeUnit.SECONDS)
-                .authentication(authentication)
-                .build();
+                .authentication(authentication));
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
index 50bb2e9..0abebcc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
@@ -134,7 +134,7 @@ public class KeyStoreTlsProducerConsumerTestWithAuth 
extends ProducerConsumerBas
             authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, 
CLIENT_KEYSTORE_PW);
             
clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(), 
authParams);
         }
-        pulsarClient = clientBuilder.build();
+        replacePulsarClient(clientBuilder);
     }
 
     protected void internalSetUpForNamespace() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
index 70a201c..f288e90 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
@@ -121,7 +121,7 @@ public class KeyStoreTlsProducerConsumerTestWithoutAuth 
extends ProducerConsumer
             authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, 
CLIENT_KEYSTORE_PW);
             
clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(), 
authParams);
         }
-        pulsarClient = clientBuilder.build();
+        replacePulsarClient(clientBuilder);
     }
 
     protected void internalSetUpForNamespace() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
new file mode 100644
index 0000000..f57fe6e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Commands.ChecksumType;
+import org.apache.pulsar.tests.EnumValuesDataProvider;
+import org.awaitility.Awaitility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class MessageChecksumTest extends BrokerTestBase {
+    private static final Logger log = 
LoggerFactory.getLogger(MessageChecksumTest.class);
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
+        // disable connection pooling
+        clientBuilder.connectionsPerBroker(0);
+    }
+
+    @Override
+    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) 
throws PulsarClientException {
+        return PulsarTestClient.create(clientBuilder);
+    }
+
+    // Enum parameter used to describe the 2 different scenarios in the
+    // testChecksumCompatibilityInMixedVersionBrokerCluster test case
+    enum MixedVersionScenario {
+        CONNECTED_TO_NEW_THEN_OLD_VERSION,
+        CONNECTED_TO_OLD_THEN_NEW_VERSION
+    }
+
+    /**
+     * Pulsar message checksums changed in protocol version v6, broker version 
v1.15.
+     *
+     * This test case verifies that a client is able to send messages to an 
older broker version
+     * (<= v1.14, protocol version <= v5) in a mixed environment of broker 
versions (<= v1.14 & >= v1.15)
+     *
+     * This test case makes the assumption that the message checksum is ignored
+     * if a tampered message can be read by the consumer in the test.
+     *
+     * Scenario behind this test case:
+     *
+     * MixedVersionScenario.CONNECTED_TO_NEW_THEN_OLD_VERSION
+     * A Pulsar client produces the message while connected to a broker that 
supports checksums.
+     * While sending the message to the broker is pending, the connection 
breaks and the client
+     * connects to another broker that doesn't support message checksums.
+     * In this case, the client should remove the message checksum before 
resending it to the broker.
+     * original PR https://github.com/apache/pulsar/pull/43
+     *
+     * MixedVersionScenario.CONNECTED_TO_OLD_THEN_NEW_VERSION
+     * A Pulsar client produces the message while connected to a broker that 
doesn't support checksums.
+     * While sending the message to the broker is pending, the connection 
breaks and the client
+     * connects to another broker that supports message checksums.
+     * In this case, the client should remove the message checksum before 
resending it to the broker.
+     * original PR https://github.com/apache/pulsar/pull/89
+     */
+    @Test(dataProviderClass = EnumValuesDataProvider.class, dataProvider = 
"values")
+    public void 
testChecksumCompatibilityInMixedVersionBrokerCluster(MixedVersionScenario 
mixedVersionScenario)
+            throws Exception {
+        // GIVEN
+        final String topicName =
+                
"persistent://prop/use/ns-abc/testChecksumBackwardsCompatibilityWithOldBrokerWithoutChecksumHandling";
+
+        if (mixedVersionScenario == 
MixedVersionScenario.CONNECTED_TO_OLD_THEN_NEW_VERSION) {
+            // Given, the client thinks it's connected to a broker that 
doesn't support message checksums
+            makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
+        }
+
+        PulsarTestClient pulsarTestClient = (PulsarTestClient) pulsarClient;
+
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscribe();
+
+        // inject a CountDownLatch to the pending message callback of the 
PulsarTestClient
+        CountDownLatch messageSendingProcessedLatch = new CountDownLatch(2);
+        pulsarTestClient.setPendingMessageCallback(__ -> 
messageSendingProcessedLatch.countDown());
+
+        // WHEN
+        // a message is sent, it should succeed
+        producer.send("message-1".getBytes());
+
+        // And
+        // communication OpSend messages are dropped to simulate a broken 
connection so that
+        // the next message doesn't get sent out yet and can be tampered 
before it's sent out
+        pulsarTestClient.dropOpSendMessages();
+
+        // And
+        // another message is sent
+        byte[] messageBytes = "message-2".getBytes();
+        TypedMessageBuilder<byte[]> messageBuilder = 
producer.newMessage().value(messageBytes);
+        CompletableFuture<MessageId> tamperedMessageSendFuture = 
messageBuilder.sendAsync();
+
+        // And
+        // until the message checksum has been calculated and it is pending
+        messageSendingProcessedLatch.await();
+        pulsarTestClient.setPendingMessageCallback(null);
+
+        // And
+        // the producer disconnects from the broker and the test client is put 
in a mode where reconnecting is rejected
+        pulsarTestClient.disconnectProducerAndRejectReconnecting(producer);
+
+        // And
+        // when the the message is tampered by changing the last byte to '3'. 
This corrupts the already calculated
+        // checksum.
+        ((TypedMessageBuilderImpl<byte[]>) 
messageBuilder).getContent().put(messageBytes.length - 1, (byte) '3');
+
+        if (mixedVersionScenario == 
MixedVersionScenario.CONNECTED_TO_NEW_THEN_OLD_VERSION) {
+            // Given, the client thinks it's connected to a broker that 
doesn't support message checksums
+            makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
+        } else {
+            // Reset the overriding set in the beginning
+            resetOverridingConnectedBrokerVersion();
+        }
+
+        // And
+        // when finally the pulsar client is allowed to reconnect to the broker
+        pulsarTestClient.allowReconnecting();
+
+        // THEN
+        try {
+            // sending of tampered message should not fail since the client is 
expected to remove the checksum from the
+            // message before sending it an older broker version
+            tamperedMessageSendFuture.get(10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            fail("Broker shouldn't verify checksum for corrupted message and 
it shouldn't fail", e);
+        }
+
+        // and then
+        // first message is received
+        Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+        // and contains the expected payload
+        assertEquals(new String(msg.getData()), "message-1");
+        // second message is received
+        msg = consumer.receive(1, TimeUnit.SECONDS);
+        // and contains the tampered payload
+        assertEquals(new String(msg.getData()), "message-3");
+    }
+
+    private void 
makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport() {
+        // make the client think that the connected broker is of version which 
doesn't support checksum validation
+        ((PulsarTestClient) 
pulsarClient).setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
+    }
+
+    private void resetOverridingConnectedBrokerVersion() {
+        // reset the override and use the actual protocol version
+        ((PulsarTestClient) 
pulsarClient).setOverrideRemoteEndpointProtocolVersion(0);
+    }
+
+    private void 
waitUntilMessageIsPendingWithCalculatedChecksum(ProducerImpl<?> producer) {
+        // wait until the message is in the pending queue
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(producer.getPendingQueueSize(), 1);
+        });
+    }
+
+    @Test
+    public void testTamperingMessageIsDetected() throws Exception {
+        // GIVEN
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+                
.topic("persistent://prop/use/ns-abc/testTamperingMessageIsDetected")
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        TypedMessageBuilderImpl<byte[]> msgBuilder = 
(TypedMessageBuilderImpl<byte[]>) producer.newMessage()
+                .value("a message".getBytes());
+        MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder()
+                .setProducerName("test")
+                .setSequenceId(1)
+                .setPublishTime(10L);
+        ByteBuf payload = Unpooled.wrappedBuffer(msgBuilder.getContent());
+
+        // WHEN
+        // protocol message is created with checksum
+        ByteBufPair cmd = Commands.newSend(1, 1, 1, ChecksumType.Crc32c, 
msgMetadata, payload);
+        OpSendMsg op = OpSendMsg.create((MessageImpl<byte[]>) 
msgBuilder.getMessage(), cmd, 1, null);
+
+        // THEN
+        // the checksum validation passes
+        assertTrue(producer.verifyLocalBufferIsNotCorrupted(op));
+
+        // WHEN
+        // the content of the message is tampered
+        msgBuilder.getContent().put(0, (byte) 'b');
+        // the checksum validation fails
+        assertFalse(producer.verifyLocalBufferIsNotCorrupted(op));
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index e997c03..7162592 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -18,27 +18,14 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -47,16 +34,10 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.common.protocol.ByteBufPair;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.Commands.ChecksumType;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.tests.EnumValuesDataProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -76,451 +57,121 @@ public class MessageIdTest extends BrokerTestBase {
         internalCleanup();
     }
 
-    @Test(timeOut = 10000)
-    public void producerSendAsync() throws PulsarClientException {
-        // 1. Basic Config
-        String key = "producerSendAsync";
+    @Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, 
dataProvider = "values")
+    public void producerSendAsync(TopicType topicType) throws 
PulsarClientException, PulsarAdminException {
+        // Given
+        String key = "producerSendAsync-" + topicType;
         final String topicName = "persistent://prop/cluster/namespace/topic-" 
+ key;
         final String subscriptionName = "my-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String messagePrefix = "my-message-" + key + "-";
         final int numberOfMessages = 30;
+        if (topicType == TopicType.PARTITIONED) {
+            int numberOfPartitions = 3;
+            admin.topics().createPartitionedTopic(topicName, 
numberOfPartitions);
+        }
 
-        // 2. Create Producer
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
-        // 3. Create Consumer
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
                 .subscribe();
 
-        // 4. Publish message and get message id
-        Set<MessageId> messageIds = new HashSet<>();
+        // When
+        // Messages are published asynchronously
         List<Future<MessageId>> futures = new ArrayList<>();
         for (int i = 0; i < numberOfMessages; i++) {
-            String message = messagePredicate + i;
+            String message = messagePrefix + i;
             futures.add(producer.sendAsync(message.getBytes()));
         }
 
+        // Then
+        // expect that the Message Ids of subsequently sent messages are in 
ascending order
+        Set<MessageId> messageIds = new HashSet<>();
         MessageIdImpl previousMessageId = null;
         for (Future<MessageId> f : futures) {
             try {
                 MessageIdImpl currentMessageId = (MessageIdImpl) f.get();
                 if (previousMessageId != null) {
-                    
Assert.assertTrue(currentMessageId.compareTo(previousMessageId) > 0,
+                    assertTrue(currentMessageId.compareTo(previousMessageId) > 
0,
                             "Message Ids should be in ascending order");
                 }
                 messageIds.add(currentMessageId);
                 previousMessageId = currentMessageId;
             } catch (Exception e) {
-                Assert.fail("Failed to publish message, Exception: " + 
e.getMessage());
+                fail("Failed to publish message", e);
             }
         }
 
-        // 4. Check if message Ids are correct
-        log.info("Message IDs = " + messageIds);
-        Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all 
messages published successfully");
+        // And
+        // expect that there's a message id for each sent out message
+        // and that all messages have been received by the consumer
+        log.info("Message IDs = {}", messageIds);
+        assertEquals(messageIds.size(), numberOfMessages, "Not all messages 
published successfully");
 
         for (int i = 0; i < numberOfMessages; i++) {
             Message<byte[]> message = consumer.receive();
-            Assert.assertEquals(new String(message.getData()), 
messagePredicate + i);
+            assertEquals(new String(message.getData()), messagePrefix + i);
             MessageId messageId = message.getMessageId();
-            Assert.assertTrue(messageIds.remove(messageId), "Failed to receive 
message");
-        }
-        log.info("Message IDs = " + messageIds);
-        Assert.assertEquals(messageIds.size(), 0, "Not all messages received 
successfully");
-        consumer.unsubscribe();
-    }
-
-    @Test(timeOut = 10000)
-    public void producerSend() throws PulsarClientException {
-        // 1. Basic Config
-        String key = "producerSend";
-        final String topicName = "persistent://prop/cluster/namespace/topic-" 
+ key;
-        final String subscriptionName = "my-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
-        final int numberOfMessages = 30;
-
-        // 2. Create Producer
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
-
-        // 3. Create Consumer
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                .subscribe();
-
-        // 4. Publish message and get message id
-        Set<MessageId> messageIds = new HashSet<>();
-        for (int i = 0; i < numberOfMessages; i++) {
-            String message = messagePredicate + i;
-            messageIds.add(producer.send(message.getBytes()));
-        }
-
-        // 4. Check if message Ids are correct
-        log.info("Message IDs = " + messageIds);
-        Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all 
messages published successfully");
-
-        for (int i = 0; i < numberOfMessages; i++) {
-            
Assert.assertTrue(messageIds.remove(consumer.receive().getMessageId()), "Failed 
to receive Message");
-        }
-        log.info("Message IDs = " + messageIds);
-        Assert.assertEquals(messageIds.size(), 0, "Not all messages received 
successfully");
-        consumer.unsubscribe();
-        ;
-    }
-
-    @Test(timeOut = 10000)
-    public void partitionedProducerSendAsync() throws PulsarClientException, 
PulsarAdminException {
-        // 1. Basic Config
-        String key = "partitionedProducerSendAsync";
-        final String topicName = "persistent://prop/cluster/namespace/topic-" 
+ key;
-        final String subscriptionName = "my-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
-        final int numberOfMessages = 30;
-        int numberOfPartitions = 3;
-        admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
-
-        // 2. Create Producer
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
-
-        // 3. Create Consumer
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                .subscribe();
-
-        // 4. Publish message and get message id
-        Set<MessageId> messageIds = new HashSet<>();
-        Set<Future<MessageId>> futures = new HashSet<>();
-        for (int i = 0; i < numberOfMessages; i++) {
-            String message = messagePredicate + i;
-            futures.add(producer.sendAsync(message.getBytes()));
-        }
-
-        futures.forEach(f -> {
-            try {
-                messageIds.add(f.get());
-            } catch (Exception e) {
-                Assert.fail("Failed to publish message, Exception: " + 
e.getMessage());
+            if (topicType == TopicType.PARTITIONED) {
+                messageId = ((TopicMessageIdImpl) 
messageId).getInnerMessageId();
             }
-        });
-
-        // 4. Check if message Ids are correct
-        log.info("Message IDs = " + messageIds);
-        Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all 
messages published successfully");
-
-        for (int i = 0; i < numberOfMessages; i++) {
-            MessageId topicMessageId = consumer.receive().getMessageId();
-            MessageId messageId = 
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
-            log.info("Message ID Received = " + messageId);
-            Assert.assertEquals(topicMessageId.toString(), 
messageId.toString());
-            Assert.assertTrue(messageIds.remove(messageId), "Failed to receive 
Message");
+            assertTrue(messageIds.remove(messageId), "Failed to receive 
message");
         }
-        log.info("Message IDs = " + messageIds);
-        Assert.assertEquals(messageIds.size(), 0, "Not all messages received 
successfully");
+        log.info("Remaining message IDs = {}", messageIds);
+        assertEquals(messageIds.size(), 0, "Not all messages received 
successfully");
         consumer.unsubscribe();
     }
 
-    @Test(timeOut = 10000)
-    public void partitionedProducerSend() throws PulsarClientException, 
PulsarAdminException {
-        // 1. Basic Config
-        String key = "partitionedProducerSend";
+    @Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, 
dataProvider = "values")
+    public void producerSend(TopicType topicType) throws 
PulsarClientException, PulsarAdminException {
+        // Given
+        String key = "producerSend-" + topicType;
         final String topicName = "persistent://prop/cluster/namespace/topic-" 
+ key;
         final String subscriptionName = "my-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String messagePrefix = "my-message-" + key + "-";
         final int numberOfMessages = 30;
-        int numberOfPartitions = 7;
-        admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
+        if (topicType == TopicType.PARTITIONED) {
+            int numberOfPartitions = 7;
+            admin.topics().createPartitionedTopic(topicName, 
numberOfPartitions);
+        }
 
-        // 2. Create Producer
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .enableBatching(false)
+                .topic(topicName)
+                .create();
 
-        // 3. Create Consumer
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
                 .subscribe();
 
-        // 4. Publish message and get message id
+        // When
+        // Messages are published
         Set<MessageId> messageIds = new HashSet<>();
         for (int i = 0; i < numberOfMessages; i++) {
-            String message = messagePredicate + i;
+            String message = messagePrefix + i;
             messageIds.add(producer.send(message.getBytes()));
         }
 
-        // 4. Check if message Ids are correct
-        log.info("Message IDs = " + messageIds);
-        Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all 
messages published successfully");
+        // Then
+        // expect that the Message Ids of subsequently sent messages are in 
ascending order
+        log.info("Message IDs = {}", messageIds);
+        assertEquals(messageIds.size(), numberOfMessages, "Not all messages 
published successfully");
 
         for (int i = 0; i < numberOfMessages; i++) {
-            MessageId topicMessageId = consumer.receive().getMessageId();
-            MessageId messageId = 
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
-            Assert.assertEquals(topicMessageId.toString(), 
messageId.toString());
-            Assert.assertTrue(messageIds.remove(messageId), "Failed to receive 
Message");
-        }
-        log.info("Message IDs = " + messageIds);
-        Assert.assertEquals(messageIds.size(), 0, "Not all messages received 
successfully");
-        // TODO - this statement causes the broker to hang - need to look into
-        // it
-        // consumer.unsubscribe();
-    }
-
-    /**
-     * Verifies: different versions of broker-deployment (one broker 
understands Checksum and other doesn't in that case
-     * remove checksum before sending to broker-2)
-     *
-     * client first produce message with checksum and then retries to send 
message due to connection unavailable. But
-     * this time, if broker doesn't understand checksum: then client should 
remove checksum from the message before
-     * sending to broker.
-     *
-     * 1. stop broker 2. client compute checksum and add into message 3. 
produce 2 messages and corrupt 1 message 4.
-     * start broker with lower version (which doesn't support checksum) 5. 
client reconnects to broker and due to
-     * incompatibility of version: removes checksum from message 6. broker 
doesn't do checksum validation and persist
-     * message 7. client receives ack
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testChecksumVersionComptability() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic1";
-
-        // 1. producer connect
-        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
-        ProducerImpl<byte[]> producer = spy(prod);
-        // return higher version compare to broker : so, it forces 
client-producer to remove checksum from payload
-        doReturn(producer.brokerChecksumSupportedVersion() + 
1).when(producer).brokerChecksumSupportedVersion();
-        doAnswer(invocationOnMock -> 
prod.getState()).when(producer).getState();
-        doAnswer(invocationOnMock -> 
prod.getClientCnx()).when(producer).getClientCnx();
-        doAnswer(invocationOnMock -> prod.cnx()).when(producer).cnx();
-
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
-
-        // Stop the broker, and publishes messages. Messages are accumulated 
in the producer queue and they're checksums
-        // would have already been computed. If we change the message content 
at that point, it should result in a
-        // checksum validation error
-        stopBroker();
-
-        // stop timer to auto-reconnect as let spy-Producer connect to broker 
manually so, spy-producer object can get
-        // mock-value from brokerChecksumSupportedVersion
-        ((PulsarClientImpl) pulsarClient).timer().stop();
-
-        ClientCnx mockClientCnx = spy(
-                new ClientCnx(new ClientConfigurationData(), 
((PulsarClientImpl) pulsarClient).eventLoopGroup()));
-        doReturn(producer.brokerChecksumSupportedVersion() - 
1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
-        prod.setClientCnx(mockClientCnx);
-
-        CompletableFuture<MessageId> future1 = 
producer.sendAsync("message-1".getBytes());
-
-        byte[] a2 = "message-2".getBytes();
-        TypedMessageBuilder<byte[]> msg2 = producer.newMessage().value(a2);
-
-        CompletableFuture<MessageId> future2 = msg2.sendAsync();
-
-        // corrupt the message, new content would be 'message-3'
-        ((TypedMessageBuilderImpl<byte[]>) msg2).getContent().put(a2.length - 
1, (byte) '3');
-
-        prod.setClientCnx(null);
-
-        // Restart the broker to have the messages published
-        startBroker();
-
-        // grab broker connection with mocked producer which has higher 
version compare to broker
-        prod.grabCnx();
-
-        try {
-            // it should not fail: as due to unsupported version of broker: 
client removes checksum and broker should
-            // ignore the checksum validation
-            future1.get();
-            future2.get();
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Broker shouldn't verify checksum for corrupted message and 
it shouldn't fail");
-        }
-
-        ((ConsumerImpl<byte[]>) consumer).grabCnx();
-        // We should only receive msg1
-        Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
-        assertEquals(new String(msg.getData()), "message-1");
-        msg = consumer.receive(1, TimeUnit.SECONDS);
-        assertEquals(new String(msg.getData()), "message-3");
-
-    }
-
-    @Test
-    public void testChecksumReconnection() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/topic1";
-
-        // 1. producer connect
-        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
-        ProducerImpl<byte[]> producer = spy(prod);
-        // mock: broker-doesn't support checksum (remote_version < 
brokerChecksumSupportedVersion) so, it forces
-        // client-producer to perform checksum-strip from msg at reconnection
-        doReturn(producer.brokerChecksumSupportedVersion() + 
1).when(producer).brokerChecksumSupportedVersion();
-        doAnswer(invocationOnMock -> 
prod.getState()).when(producer).getState();
-        doAnswer(invocationOnMock -> 
prod.getClientCnx()).when(producer).getClientCnx();
-        doAnswer(invocationOnMock -> prod.cnx()).when(producer).cnx();
-
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
-
-        stopBroker();
-
-        // stop timer to auto-reconnect as let spy-Producer connect to broker
-        // manually so, spy-producer object can get
-        // mock-value from brokerChecksumSupportedVersion
-        ((PulsarClientImpl) pulsarClient).timer().stop();
-
-        // set clientCnx mock to get non-checksum supported version
-        ClientCnx mockClientCnx = spy(
-                new ClientCnx(new ClientConfigurationData(), 
((PulsarClientImpl) pulsarClient).eventLoopGroup()));
-        doReturn(producer.brokerChecksumSupportedVersion() - 
1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
-        prod.setClientCnx(mockClientCnx);
-
-        CompletableFuture<MessageId> future1 = 
producer.sendAsync("message-1".getBytes());
-
-        byte[] a2 = "message-2".getBytes();
-        TypedMessageBuilder<byte[]> msg2 = producer.newMessage().value(a2);
-
-        CompletableFuture<MessageId> future2 = msg2.sendAsync();
-
-        // corrupt the message, new content would be 'message-3'
-        ((TypedMessageBuilderImpl<byte[]>) msg2).getContent().put(a2.length - 
1, (byte) '3');
-
-        // unset mock
-        prod.setClientCnx(null);
-
-        // Restart the broker to have the messages published
-        startBroker();
-
-        // grab broker connection with mocked producer which has higher version
-        // compare to broker
-        prod.grabCnx();
-
-        try {
-            // it should not fail: as due to unsupported version of broker:
-            // client removes checksum and broker should
-            // ignore the checksum validation
-            future1.get(10, TimeUnit.SECONDS);
-            future2.get(10, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Broker shouldn't verify checksum for corrupted message and 
it shouldn't fail");
-        }
-
-        ((ConsumerImpl<byte[]>) consumer).grabCnx();
-        // We should only receive msg1
-        Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
-        assertEquals(new String(msg.getData()), "message-1");
-        msg = consumer.receive(1, TimeUnit.SECONDS);
-        assertEquals(new String(msg.getData()), "message-3");
-
-    }
-
-    /**
-     * Verifies: if message is corrupted before sending to broker and if 
broker gives checksum error: then 1.
-     * Client-Producer recomputes checksum with modified data 2. Retry 
message-send again 3. Broker verifies checksum 4.
-     * client receives send-ack success
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testCorruptMessageRemove() throws Exception {
-
-        final String topicName = "persistent://prop/use/ns-abc/retry-topic";
-
-        // 1. producer connect
-        ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
-            .topic(topicName)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .sendTimeout(10, TimeUnit.MINUTES)
-            .create();
-        ProducerImpl<byte[]> producer = spy(prod);
-        Field producerIdField = 
ProducerImpl.class.getDeclaredField("producerId");
-        producerIdField.setAccessible(true);
-        long producerId = (long) producerIdField.get(producer);
-        producer.cnx().registerProducer(producerId, producer); // registered 
spy ProducerImpl
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
-
-        // 2. Stop the broker, and publishes messages. Messages are 
accumulated in the producer queue and they're
-        // checksums
-        // would have already been computed. If we change the message content 
at that point, it should result in a
-        // checksum validation error
-        // enable checksum at producer
-        stopBroker();
-
-        byte[] a = "message-1".getBytes();
-        TypedMessageBuilder<byte[]> msg = producer.newMessage().value(a);
-
-        CompletableFuture<MessageId> future = msg.sendAsync();
-
-        // corrupt the message, new content would be 'message-2'
-        ((TypedMessageBuilderImpl<byte[]>) msg).getContent().put(a.length - 1, 
(byte) '2');
-
-        // 4. Restart the broker to have the messages published
-        startBroker();
-
-        try {
-            future.get();
-            fail("send message should have failed with checksum excetion");
-        } catch (Exception e) {
-            if (e.getCause() instanceof 
PulsarClientException.ChecksumException) {
-                // ok (callback should get checksum exception as message was 
modified and corrupt)
-            } else {
-                fail("Callback should have only failed with 
ChecksumException", e);
+            MessageId messageId = consumer.receive().getMessageId();
+            if (topicType == TopicType.PARTITIONED) {
+                messageId = ((TopicMessageIdImpl) 
messageId).getInnerMessageId();
             }
+            assertTrue(messageIds.remove(messageId), "Failed to receive 
Message");
         }
-
-        // 5. Verify
-        /**
-         * verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => validates 
if message is corrupt
-         */
-        byte[] a2 = "message-2".getBytes();
-
-        TypedMessageBuilderImpl<byte[]> msg2 = 
(TypedMessageBuilderImpl<byte[]>) 
producer.newMessage().value("message-1".getBytes());
-        ByteBuf payload = Unpooled.wrappedBuffer(msg2.getContent());
-        MessageMetadata msgMetadata = ((TypedMessageBuilderImpl<byte[]>) 
msg).getMetadataBuilder();
-        
msgMetadata.setProducerName("test").setSequenceId(1).setPublishTime(10L);
-        ByteBufPair cmd = Commands.newSend(producerId, 1, 1, 
ChecksumType.Crc32c, msgMetadata, payload);
-        // (a) create OpSendMsg with message-data : "message-1"
-        OpSendMsg op = OpSendMsg.create(((MessageImpl<byte[]>) 
msg2.getMessage()), cmd, 1, null);
-        // a.verify: as message is not corrupt: no need to update checksum
-        assertTrue(producer.verifyLocalBufferIsNotCorrupted(op));
-
-        // (b) corrupt message
-        msg2.getContent().put(a2.length - 1, (byte) '2'); // new content would 
be 'message-2'
-        // b. verify: as message is corrupt: update checksum
-        assertFalse(producer.verifyLocalBufferIsNotCorrupted(op));
-
-        assertEquals(producer.getPendingQueueSize(), 0);
-
-        // [2] test-recoverChecksumError functionality
-        stopBroker();
-
-        TypedMessageBuilderImpl<byte[]> msg1 = 
(TypedMessageBuilderImpl<byte[]>) 
producer.newMessage().value("message-1".getBytes());
-        future = msg1.sendAsync();
-        ClientCnx cnx = spy(
-                new ClientCnx(new ClientConfigurationData(), 
((PulsarClientImpl) pulsarClient).eventLoopGroup()));
-        String exc = "broker is already stopped";
-        // when client-try to recover checksum by resending to broker: throw 
exception as broker is stopped
-        doThrow(new IllegalStateException(exc)).when(cnx).ctx();
-        try {
-            producer.recoverChecksumError(cnx, 1);
-            fail("it should call : resendMessages() => which should throw 
above mocked exception");
-        } catch (IllegalStateException e) {
-            assertEquals(exc, e.getMessage());
-        }
-
-        producer.close();
-        consumer.close();
-        producer = null; // clean reference of mocked producer
+        log.info("Remaining message IDs = {}", messageIds);
+        assertEquals(messageIds.size(), 0, "Not all messages received 
successfully");
+        consumer.unsubscribe();
     }
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
new file mode 100644
index 0000000..9543f6e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
+
+/**
+ * A Pulsar Client that is used for testing scenarios where the different
+ * asynchronous operations of the client-broker interaction must be 
orchestrated by the test
+ * so that race conditions caused by the test code can be eliminated.
+ *
+ * features:
+ * - can override remote endpoint protocol version in a thread safe manner
+ * - can reject new connections from the client to the broker
+ * - can drop all OpSend messages after they have been added to 
pendingMessages and processed
+ *   by the client. This simulates a situation where sending messages go to a 
"black hole".
+ * - can synchronize operations with the help of the pending message callback 
which gets
+ *   called after the message to send out has been added to the pending 
messages in the client.
+ *
+ */
+public class PulsarTestClient extends PulsarClientImpl {
+    private volatile int overrideRemoteEndpointProtocolVersion;
+    private volatile boolean rejectNewConnections;
+    private volatile boolean dropOpSendMessages;
+    private volatile Consumer<ProducerImpl.OpSendMsg> pendingMessageCallback;
+
+    /**
+     * Create a new PulsarTestClient instance.
+     *
+     * @param clientBuilder ClientBuilder instance containing the 
configuration of the client
+     * @return a new
+     * @throws PulsarClientException
+     */
+    public static PulsarTestClient create(ClientBuilder clientBuilder) throws 
PulsarClientException {
+        ClientConfigurationData clientConfigurationData =
+                ((ClientBuilderImpl) 
clientBuilder).getClientConfigurationData();
+
+        // the reason to do all the following is to be able to pass the 
supplier for creating new ClientCnx
+        // instances after the constructor of PulsarClientImpl has been called.
+        // An anonymous subclass of ClientCnx class is used to override the 
getRemoteEndpointProtocolVersion()
+        // method.
+        EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(clientConfigurationData.getNumIoThreads(),
+                new DefaultThreadFactory("pulsar-client-io", 
Thread.currentThread().isDaemon()));
+
+        AtomicReference<Supplier<ClientCnx>> clientCnxSupplierReference = new 
AtomicReference<>();
+        ConnectionPool connectionPool = new 
ConnectionPool(clientConfigurationData, eventLoopGroup,
+                () -> clientCnxSupplierReference.get().get());
+
+        return new PulsarTestClient(clientConfigurationData, eventLoopGroup, 
connectionPool,
+                clientCnxSupplierReference);
+    }
+
+    private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool,
+                             AtomicReference<Supplier<ClientCnx>> 
clientCnxSupplierReference)
+            throws PulsarClientException {
+        super(conf, eventLoopGroup, cnxPool);
+        // workaround initialization order issue so that ClientCnx can be 
created in this class
+        clientCnxSupplierReference.set(this::createClientCnx);
+    }
+
+    /**
+     * Overrides the default ClientCnx implementation with an implementation 
that overrides the
+     * getRemoteEndpointProtocolVersion() method. This is used to test client 
behaviour in certain cases.
+     *
+     * @return new ClientCnx instance
+     */
+    protected ClientCnx createClientCnx() {
+        return new ClientCnx(conf, eventLoopGroup) {
+            @Override
+            public int getRemoteEndpointProtocolVersion() {
+                return overrideRemoteEndpointProtocolVersion != 0
+                        ? overrideRemoteEndpointProtocolVersion
+                        : super.getRemoteEndpointProtocolVersion();
+            }
+        };
+    }
+
+    /**
+     * Overrides the getConnection method to reject new connections from being 
established between
+     * the client and brokers.
+     *
+     * @param topic the topic for the connection
+     * @return the ClientCnx to use, passed a future. Will complete with an 
exception when connections are rejected.
+     */
+    @Override
+    protected CompletableFuture<ClientCnx> getConnection(String topic) {
+        if (rejectNewConnections) {
+            CompletableFuture<ClientCnx> result = new CompletableFuture<>();
+            result.completeExceptionally(new IOException("New connections are 
rejected."));
+            return result;
+        } else {
+            return super.getConnection(topic);
+        }
+    }
+
+    /**
+     * Overrides the producer instance with an anonynomous subclass that adds 
hooks for observing new
+     * OpSendMsg instances being added to pending messages in the client.
+     * It also configures the hook to drop OpSend messages when dropping is 
enabled.
+     */
+    @Override
+    protected <T> ProducerImpl<T> newProducerImpl(String topic, int 
partitionIndex, ProducerConfigurationData conf,
+                                                  Schema<T> schema, 
ProducerInterceptors interceptors,
+                                                  
CompletableFuture<Producer<T>> producerCreatedFuture) {
+        return new ProducerImpl<T>(this, topic, conf, producerCreatedFuture, 
partitionIndex, schema,
+                interceptors) {
+            @Override
+            protected BlockingQueue<OpSendMsg> createPendingMessagesQueue() {
+                return new 
ArrayBlockingQueue<OpSendMsg>(conf.getMaxPendingMessages()) {
+                    @Override
+                    public void put(OpSendMsg opSendMsg) throws 
InterruptedException {
+                        super.put(opSendMsg);
+                        if (pendingMessageCallback != null) {
+                            pendingMessageCallback.accept(opSendMsg);
+                        }
+                    }
+                };
+            }
+
+            @Override
+            protected boolean shouldWriteOpSendMsg() {
+                if (dropOpSendMessages) {
+                    return false;
+                } else {
+                    return super.shouldWriteOpSendMsg();
+                }
+            }
+        };
+    }
+
+    public void setOverrideRemoteEndpointProtocolVersion(int 
overrideRemoteEndpointProtocolVersion) {
+        this.overrideRemoteEndpointProtocolVersion = 
overrideRemoteEndpointProtocolVersion;
+    }
+
+    public void setRejectNewConnections(boolean rejectNewConnections) {
+        this.rejectNewConnections = rejectNewConnections;
+    }
+
+    /**
+     * Simulates the producer connection getting dropped. Will also reject 
reconnections to simulate an
+     * outage. This reduces race conditions since the reconnection has to be 
explicitly enabled by calling
+     * allowReconnecting() method.
+     */
+    public void disconnectProducerAndRejectReconnecting(ProducerImpl<?> 
producer) throws IOException {
+        // wait until all possible in-flight messages have been delivered
+        Awaitility.await().untilAsserted(() -> {
+            if (!dropOpSendMessages && producer.isConnected()) {
+                assertEquals(producer.getPendingQueueSize(), 0);
+            }
+        });
+
+        // reject new connection attempts
+        setRejectNewConnections(true);
+
+        // make the existing connection between the producer and broker to 
break by explicitly closing it
+        ClientCnx cnx = producer.cnx();
+        producer.connectionClosed(cnx);
+        cnx.close();
+    }
+
+    /**
+     * Resets possible dropping of OpSend messages and allows the client to 
reconnect to the broker.
+     */
+    public void allowReconnecting() {
+        dropOpSendMessages = false;
+        setRejectNewConnections(false);
+    }
+
+    /**
+     * Assigns the callback to use for handling OpSend messages once a message 
had been added to pending messages.
+     * @param pendingMessageCallback
+     */
+    public void setPendingMessageCallback(
+            Consumer<ProducerImpl.OpSendMsg> pendingMessageCallback) {
+        this.pendingMessageCallback = pendingMessageCallback;
+    }
+
+    /**
+     * Enable dropping of OpSend messages after they have been added to 
pendingMessages and processed
+     * by the client. The OpSend messages won't be delivered until the 
allowReconnecting method has been called.
+     */
+    public void dropOpSendMessages() {
+        this.dropOpSendMessages = true;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index df05096..2914741 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -319,7 +319,7 @@ public class ClientCnx extends PulsarHandler {
             log.debug("{} Connection is ready", ctx.channel());
         }
         // set remote protocol version to the correct version before we 
complete the connection future
-        remoteEndpointProtocolVersion = connected.getProtocolVersion();
+        setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
         connectionFuture.complete(null);
         state = State.Ready;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index ee938e0..61b72fe 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -129,8 +128,8 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
         AtomicInteger completed = new AtomicInteger();
         for (int partitionIndex = 0; partitionIndex < 
topicMetadata.numPartitions(); partitionIndex++) {
             String partitionName = 
TopicName.get(topic).getPartition(partitionIndex).toString();
-            ProducerImpl<T> producer = new ProducerImpl<>(client, 
partitionName, conf, new CompletableFuture<>(),
-                    partitionIndex, schema, interceptors);
+            ProducerImpl<T> producer = client.newProducerImpl(partitionName, 
partitionIndex,
+                    conf, schema, interceptors, new CompletableFuture<>());
             producers.add(producer);
             producer.producerCreatedFuture().handle((prod, createException) -> 
{
                 if (createException != null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index de88e19..729b895 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -152,8 +152,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             this.userProvidedProducerName = true;
         }
         this.partitionIndex = partitionIndex;
-        this.pendingMessages = 
Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
-        this.pendingCallbacks = 
Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
+        this.pendingMessages = createPendingMessagesQueue();
+        this.pendingCallbacks = createPendingCallbacksQueue();
         this.semaphore = new Semaphore(conf.getMaxPendingMessages(), true);
 
         this.compressor = 
CompressionCodecProvider.getCompressionCodec(conf.getCompressionType());
@@ -245,6 +245,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         grabCnx();
     }
 
+    protected BlockingQueue<OpSendMsg> createPendingMessagesQueue() {
+        return Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
+    }
+
+    protected BlockingQueue<OpSendMsg> createPendingCallbacksQueue() {
+        return Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
+    }
+
     public ConnectionHandler getConnectionHandler() {
         return connectionHandler;
     }
@@ -656,7 +664,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return Commands.newSend(producerId, lowestSequenceId, 
highestSequenceId, numMessages, getChecksumType(), msgMetadata, 
compressedPayload);
     }
 
-    private ChecksumType getChecksumType() {
+    protected ChecksumType getChecksumType() {
         if (connectionHandler.cnx() == null
                 || connectionHandler.cnx().getRemoteEndpointProtocolVersion() 
>= brokerChecksumSupportedVersion()) {
             return ChecksumType.Crc32c;
@@ -1694,8 +1702,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this,
                         last -> Math.max(last, getHighestSequenceId(op)));
             }
-            ClientCnx cnx = cnx();
-            if (isConnected()) {
+            if (shouldWriteOpSendMsg()) {
+                ClientCnx cnx = cnx();
                 if (op.msg != null && op.msg.getSchemaState() == None) {
                     tryRegisterSchema(cnx, op.msg, op.callback);
                     return;
@@ -1726,6 +1734,16 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
     }
 
+    /**
+     * Hook method for testing. By returning false, it's possible to prevent 
messages
+     * being delivered to the broker.
+     *
+     * @return true if OpSend messages should be written to open connection
+     */
+    protected boolean shouldWriteOpSendMsg() {
+        return isConnected();
+    }
+
     private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from) {
         final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < 
brokerChecksumSupportedVersion();
         Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index ed04e51..769cfef 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -90,7 +90,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarClientImpl.class);
 
-    private final ClientConfigurationData conf;
+    protected final ClientConfigurationData conf;
     private LookupService lookup;
     private final ConnectionPool cnxPool;
     private final Timer timer;
@@ -109,7 +109,7 @@ public class PulsarClientImpl implements PulsarClient {
     private final AtomicLong consumerIdGenerator = new AtomicLong();
     private final AtomicLong requestIdGenerator = new AtomicLong();
 
-    private final EventLoopGroup eventLoopGroup;
+    protected final EventLoopGroup eventLoopGroup;
     private final MemoryLimitController memoryLimitController;
 
     private final LoadingCache<String, SchemaInfoProvider> 
schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(100000)
@@ -290,10 +290,10 @@ public class PulsarClientImpl implements PulsarClient {
 
             ProducerBase<T> producer;
             if (metadata.partitions > 0) {
-                producer = new 
PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, 
metadata.partitions,
-                        producerCreatedFuture, schema, interceptors);
+                producer = newPartitionedProducerImpl(topic, conf, schema, 
interceptors, producerCreatedFuture,
+                        metadata);
             } else {
-                producer = new ProducerImpl<>(PulsarClientImpl.this, topic, 
conf, producerCreatedFuture, -1, schema, interceptors);
+                producer = newProducerImpl(topic, -1, conf, schema, 
interceptors, producerCreatedFuture);
             }
 
             producers.add(producer);
@@ -306,6 +306,53 @@ public class PulsarClientImpl implements PulsarClient {
         return producerCreatedFuture;
     }
 
+    /**
+     * Factory method for creating PartitionedProducerImpl instance.
+     *
+     * Allows overriding the PartitionedProducerImpl instance in tests.
+     *
+     * @param topic topic name
+     * @param conf producer configuration
+     * @param schema topic schema
+     * @param interceptors producer interceptors
+     * @param producerCreatedFuture future for signaling completion of async 
producer creation
+     * @param metadata partitioned topic metadata
+     * @param <T> message type class
+     * @return new PartitionedProducerImpl instance
+     */
+    protected <T> PartitionedProducerImpl<T> newPartitionedProducerImpl(String 
topic,
+                                                                        
ProducerConfigurationData conf,
+                                                                        
Schema<T> schema,
+                                                                        
ProducerInterceptors interceptors,
+                                                                        
CompletableFuture<Producer<T>> producerCreatedFuture,
+                                                                        
PartitionedTopicMetadata metadata) {
+        return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, 
conf, metadata.partitions,
+                producerCreatedFuture, schema, interceptors);
+    }
+
+    /**
+     * Factory method for creating ProducerImpl instance.
+     *
+     * Allows overriding the ProducerImpl instance in tests.
+     *
+     * @param topic topic name
+     * @param partitionIndex partition index of a partitioned topic. the value 
-1 is used for non-partitioned topics.
+     * @param conf producer configuration
+     * @param schema topic schema
+     * @param interceptors producer interceptors
+     * @param producerCreatedFuture future for signaling completion of async 
producer creation
+     * @param <T> message type class
+     * @return
+     */
+    protected <T> ProducerImpl<T> newProducerImpl(String topic, int 
partitionIndex,
+                                                  ProducerConfigurationData 
conf,
+                                                  Schema<T> schema,
+                                                  ProducerInterceptors 
interceptors,
+                                                  
CompletableFuture<Producer<T>> producerCreatedFuture) {
+        return new ProducerImpl<>(PulsarClientImpl.this, topic, conf, 
producerCreatedFuture, partitionIndex, schema,
+                interceptors);
+    }
+
     public CompletableFuture<Consumer<byte[]>> 
subscribeAsync(ConsumerConfigurationData<byte[]> conf) {
         return subscribeAsync(conf, Schema.BYTES, null);
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index 65e8df3..084ef39 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -18,10 +18,20 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.Timer;
-
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadFactory;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -34,16 +44,6 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadFactory;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
 /**
  * Unit Tests of {@link PartitionedProducerImpl}.
  */
@@ -61,7 +61,7 @@ public class PartitionedProducerImplTest {
         client = mock(PulsarClientImpl.class);
         schema = mock(Schema.class);
         producerInterceptors = mock(ProducerInterceptors.class);
-        producerCreatedFuture = mock(CompletableFuture.class);
+        producerCreatedFuture = new CompletableFuture<>();
         ClientConfigurationData clientConfigurationData = 
mock(ClientConfigurationData.class);
         Timer timer = mock(Timer.class);
 
@@ -70,6 +70,13 @@ public class PartitionedProducerImplTest {
         when(client.getConfiguration()).thenReturn(clientConfigurationData);
         when(client.timer()).thenReturn(timer);
         when(client.newProducer()).thenReturn(producerBuilderImpl);
+        when(client.newProducerImpl(anyString(), anyInt(), any(), any(), 
any(), any()))
+                .thenAnswer(invocationOnMock -> {
+            return new ProducerImpl<>(client, invocationOnMock.getArgument(0),
+                    invocationOnMock.getArgument(2), 
invocationOnMock.getArgument(5),
+                    invocationOnMock.getArgument(1), 
invocationOnMock.getArgument(3),
+                    invocationOnMock.getArgument(4));
+        });
     }
 
     @Test
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 0d08849..481517c 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 public abstract class PulsarHandler extends PulsarDecoder {
     protected ChannelHandlerContext ctx;
     protected SocketAddress remoteAddress;
-    protected int remoteEndpointProtocolVersion = 
ProtocolVersion.v0.getValue();
+    private int remoteEndpointProtocolVersion = ProtocolVersion.v0.getValue();
     private final long keepAliveIntervalSeconds;
     private boolean waitingForPingResponse = false;
     private ScheduledFuture<?> keepAliveTask;
@@ -43,6 +43,10 @@ public abstract class PulsarHandler extends PulsarDecoder {
         return remoteEndpointProtocolVersion;
     }
 
+    protected void setRemoteEndpointProtocolVersion(int 
remoteEndpointProtocolVersion) {
+        this.remoteEndpointProtocolVersion = remoteEndpointProtocolVersion;
+    }
+
     public PulsarHandler(int keepAliveInterval, TimeUnit unit) {
         this.keepAliveIntervalSeconds = unit.toSeconds(keepAliveInterval);
     }
@@ -98,7 +102,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
             // response later and thus not enforce the strict timeout here.
             log.warn("[{}] Forcing connection to close after keep-alive 
timeout", ctx.channel());
             ctx.close();
-        } else if (remoteEndpointProtocolVersion >= 
ProtocolVersion.v1.getValue()) {
+        } else if (getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v1.getValue()) {
             // Send keep alive probe to peer only if it supports the ping/pong 
commands, added in v1
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Sending ping message", ctx.channel());
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
index 4719271..7822c9e 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
@@ -104,7 +104,7 @@ public class ServerConnection extends PulsarHandler {
         }
         ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
         state = State.Connected;
-        remoteEndpointProtocolVersion = connect.getProtocolVersion();
+        setRemoteEndpointProtocolVersion(connect.getProtocolVersion());
     }
 
     @Override
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 909ae77..5d4c576 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -270,7 +270,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
     @Override
     protected void handleConnect(CommandConnect connect) {
         checkArgument(state == State.Init);
-        this.remoteEndpointProtocolVersion = connect.getProtocolVersion();
+        this.setRemoteEndpointProtocolVersion(connect.getProtocolVersion());
         this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
         this.protocolVersionToAdvertise = 
getProtocolVersionToAdvertise(connect);
         this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? 
connect.getProxyToBrokerUrl() : "null";
@@ -279,11 +279,11 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
             LOG.debug("Received CONNECT from {} proxyToBroker={}", 
remoteAddress, proxyToBrokerUrl);
             LOG.debug(
                 "[{}] Protocol version to advertise to broker is {}, 
clientProtocolVersion={}, proxyProtocolVersion={}",
-                remoteAddress, protocolVersionToAdvertise, 
remoteEndpointProtocolVersion,
+                remoteAddress, protocolVersionToAdvertise, 
getRemoteEndpointProtocolVersion(),
                 Commands.getCurrentProtocolVersion());
         }
 
-        if (remoteEndpointProtocolVersion < ProtocolVersion.v10.getValue()) {
+        if (getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v10.getValue()) {
             LOG.warn("[{}] Client doesn't support connecting through proxy", 
remoteAddress);
             ctx.close();
             return;

Reply via email to