This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19e6546 Fix flaky MessageIdTest and introduce some testing
improvements (#9286)
19e6546 is described below
commit 19e6546a3d86169b75b122b2a4b4d1884133a4f3
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 27 01:38:54 2021 +0200
Fix flaky MessageIdTest and introduce some testing improvements (#9286)
* Refactor PulsarClient initialization and lifecycle management in tests
* Add getter and setter to access remoteEndpointProtocolVersion field
- it makes it easier to override for tests
* Add hooks for overriding the producer implementation in PulsarClientImpl
- useful for tests. Instead of relying on Mockito, there's a pure Java
way to inject behavior to producer implementations for testing purposes
* Introduce PulsarTestClient that contains ways to prevent race conditions
and test flakiness
- provides features for simulating failure conditions, for example
the case of the broker connection disconnecting
* Add solution for using Enums classes as source for TestNG DataProvider
* Fix flaky MessageIdTest and move checksum related tests to new class
* Fix NPE in PartitionedProducerImplTest
---
.../pulsar/tests/EnumValuesDataProvider.java | 52 +++
.../pulsar/tests/EnumValuesDataProviderTest.java | 69 +++
.../authentication/SaslAuthenticateTest.java | 4 +-
.../apache/pulsar/broker/service/ServerCnx.java | 11 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 77 ++--
.../api/AuthenticatedProducerConsumerTest.java | 9 +-
.../AuthenticationTlsHostnameVerificationTest.java | 5 +-
.../api/AuthorizationProducerConsumerTest.java | 12 +-
.../pulsar/client/api/BrokerServiceLookupTest.java | 11 +-
.../client/api/MutualAuthenticationTest.java | 16 +-
...MultiListenersWithInternalListenerNameTest.java | 5 +-
...tiListenersWithoutInternalListenerNameTest.java | 5 +-
.../pulsar/client/api/TlsProducerConsumerBase.java | 3 +-
.../TokenAuthenticatedProducerConsumerTest.java | 5 +-
...kenOauth2AuthenticatedProducerConsumerTest.java | 5 +-
.../KeyStoreTlsProducerConsumerTestWithAuth.java | 2 +-
...KeyStoreTlsProducerConsumerTestWithoutAuth.java | 2 +-
.../pulsar/client/impl/MessageChecksumTest.java | 249 +++++++++++
.../apache/pulsar/client/impl/MessageIdTest.java | 485 +++------------------
.../pulsar/client/impl/PulsarTestClient.java | 218 +++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 2 +-
.../client/impl/PartitionedProducerImpl.java | 5 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 28 +-
.../pulsar/client/impl/PulsarClientImpl.java | 57 ++-
.../client/impl/PartitionedProducerImplTest.java | 31 +-
.../pulsar/common/protocol/PulsarHandler.java | 8 +-
.../pulsar/discovery/service/ServerConnection.java | 2 +-
.../pulsar/proxy/server/ProxyConnection.java | 6 +-
28 files changed, 845 insertions(+), 539 deletions(-)
diff --git
a/buildtools/src/main/java/org/apache/pulsar/tests/EnumValuesDataProvider.java
b/buildtools/src/main/java/org/apache/pulsar/tests/EnumValuesDataProvider.java
new file mode 100644
index 0000000..3c43bae
--- /dev/null
+++
b/buildtools/src/main/java/org/apache/pulsar/tests/EnumValuesDataProvider.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.testng.annotations.DataProvider;
+
+/**
+ * TestNG DataProvider for passing all Enum values as parameters to a test
method.
+ *
+ * Supports currently a single Enum parameter for a test method.
+ */
+public abstract class EnumValuesDataProvider {
+ @DataProvider
+ public static final Object[][] values(Method testMethod) {
+ Class<?> enumClass = Arrays.stream(testMethod.getParameterTypes())
+ .findFirst()
+ .filter(Class::isEnum)
+ .orElseThrow(() -> new IllegalArgumentException("The test
method should have an enum parameter."));
+ return toDataProviderArray((Class<? extends Enum<?>>) enumClass);
+ }
+
+ /*
+ * Converts all values of an Enum class to a TestNG DataProvider object
array
+ */
+ public static Object[][] toDataProviderArray(Class<? extends Enum<?>>
enumClass) {
+ Enum<?>[] enumValues = enumClass.getEnumConstants();
+ return Stream.of(enumValues)
+ .map(enumValue -> new Object[]{enumValue})
+ .collect(Collectors.toList())
+ .toArray(new Object[0][]);
+ }
+}
diff --git
a/buildtools/src/test/java/org/apache/pulsar/tests/EnumValuesDataProviderTest.java
b/buildtools/src/test/java/org/apache/pulsar/tests/EnumValuesDataProviderTest.java
new file mode 100644
index 0000000..9eb24cc
--- /dev/null
+++
b/buildtools/src/test/java/org/apache/pulsar/tests/EnumValuesDataProviderTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import static org.testng.Assert.assertEquals;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.testng.annotations.Test;
+
+public class EnumValuesDataProviderTest {
+ enum Sample {
+ A, B, C
+ }
+
+ @Test(dataProviderClass = EnumValuesDataProvider.class, dataProvider =
"values")
+ void testEnumValuesProvider(Sample sample) {
+ System.out.println(sample);
+ }
+
+ @Test
+ void shouldContainAllEnumValues() {
+
verifyTestParameters(EnumValuesDataProvider.toDataProviderArray(Sample.class));
+ }
+
+ @Test
+ void shouldDetermineEnumValuesFromMethod() {
+ Method testMethod = Arrays.stream(getClass().getDeclaredMethods())
+ .filter(method ->
method.getName().equals("testEnumValuesProvider"))
+ .findFirst()
+ .get();
+ verifyTestParameters(EnumValuesDataProvider.values(testMethod));
+ }
+
+ private void verifyTestParameters(Object[][] testParameters) {
+ Set<Sample> enumValuesFromDataProvider = Arrays.stream(testParameters)
+ .map(element -> element[0])
+ .map(Sample.class::cast)
+ .collect(Collectors.toSet());
+ assertEquals(enumValuesFromDataProvider, new
HashSet<>(Arrays.asList(Sample.values())));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ void shouldFailIfEnumParameterIsMissing() {
+ Method testMethod = Arrays.stream(getClass().getDeclaredMethods())
+ .filter(method ->
method.getName().equals("shouldFailIfEnumParameterIsMissing"))
+ .findFirst()
+ .get();
+ EnumValuesDataProvider.values(testMethod);
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index bb20077..6fa4b50 100644
---
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -181,10 +181,10 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
lookupUrl = new URI(pulsar.getWebServiceAddress());
- pulsarClient = PulsarClient.builder()
+ replacePulsarClient(PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.statsInterval(0, TimeUnit.SECONDS)
- .authentication(authSasl).build();
+ .authentication(authSasl));
// set admin auth, to verify admin web resources
Map<String, String> clientSaslConfig = Maps.newHashMap();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 09980fd..55d6a01 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -552,7 +552,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private void completeConnect(int clientProtoVersion, String clientVersion)
{
ctx.writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize));
state = State.Connected;
- remoteEndpointProtocolVersion = clientProtoVersion;
+ setRemoteEndpointProtocolVersion(clientProtoVersion);
if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /*
ignore default version: pulsar client */) {
this.clientVersion = clientVersion.intern();
}
@@ -662,7 +662,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
try {
AuthData brokerData = authState.refreshAuthentication();
- ctx.writeAndFlush(Commands.newAuthChallenge(authMethod,
brokerData, remoteEndpointProtocolVersion));
+ ctx.writeAndFlush(Commands.newAuthChallenge(authMethod,
brokerData,
+ getRemoteEndpointProtocolVersion()));
if (log.isDebugEnabled()) {
log.debug("[{}] Sent auth challenge to client to refresh
credentials with method: {}.",
remoteAddress, authMethod);
@@ -1958,7 +1959,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
public void closeProducer(Producer producer) {
// removes producer-connection from map and send close command to
producer
safelyRemoveProducer(producer);
- if (remoteEndpointProtocolVersion >= v5.getValue()) {
+ if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
} else {
close();
@@ -1970,7 +1971,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
public void closeConsumer(Consumer consumer) {
// removes consumer-connection from map and send close command to
consumer
safelyRemoveConsumer(consumer);
- if (remoteEndpointProtocolVersion >= v5.getValue()) {
+ if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(),
-1L));
} else {
close();
@@ -2224,7 +2225,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
public boolean isBatchMessageCompatibleVersion() {
- return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getValue();
+ return getRemoteEndpointProtocolVersion() >=
ProtocolVersion.v4.getValue();
}
boolean supportsAuthenticationRefresh() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 47c8fe3..4ad5145 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -21,11 +21,9 @@ package org.apache.pulsar.broker.auth;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
-
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
@@ -40,7 +38,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
-
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
@@ -53,6 +50,7 @@ import
org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -69,7 +67,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Base class for all tests that need a Pulsar instance without a ZK and BK
cluster
+ * Base class for all tests that need a Pulsar instance without a ZK and BK
cluster.
*/
@PowerMockIgnore(value = {"org.slf4j.*", "com.sun.org.apache.xerces.*" })
public abstract class MockedPulsarServiceBaseTest {
@@ -117,17 +115,29 @@ public abstract class MockedPulsarServiceBaseTest {
internalSetup();
}
- protected final void internalSetup(boolean isPreciseDispatcherFlowControl)
throws Exception {
- init(isPreciseDispatcherFlowControl);
- lookupUrl = new URI(brokerUrl.toString());
- if (isTcpLookup) {
- lookupUrl = new URI(pulsar.getBrokerServiceUrl());
- }
- pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+ protected PulsarClient newPulsarClient(String url, int intervalInSecs)
throws PulsarClientException {
+ ClientBuilder clientBuilder =
+ PulsarClient.builder()
+ .serviceUrl(url)
+ .statsInterval(intervalInSecs, TimeUnit.SECONDS);
+ customizeNewPulsarClientBuilder(clientBuilder);
+ return createNewPulsarClient(clientBuilder);
}
- protected PulsarClient newPulsarClient(String url, int intervalInSecs)
throws PulsarClientException {
- return
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs,
TimeUnit.SECONDS).build();
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+
+ }
+
+ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder)
throws PulsarClientException {
+ return clientBuilder.build();
+ }
+
+ protected PulsarClient replacePulsarClient(ClientBuilder clientBuilder)
throws PulsarClientException {
+ if (pulsarClient != null) {
+ pulsarClient.shutdown();
+ }
+ pulsarClient = createNewPulsarClient(clientBuilder);
+ return pulsarClient;
}
protected final void internalSetupForStatsTest() throws Exception {
@@ -163,27 +173,6 @@ public abstract class MockedPulsarServiceBaseTest {
startBroker();
}
- protected final void init(boolean isPreciseDispatcherFlowControl) throws
Exception {
- this.conf.setBrokerServicePort(Optional.of(0));
- this.conf.setBrokerServicePortTls(Optional.of(0));
- this.conf.setAdvertisedAddress("localhost");
- this.conf.setWebServicePort(Optional.of(0));
- this.conf.setWebServicePortTls(Optional.of(0));
-
this.conf.setPreciseDispatcherFlowControl(isPreciseDispatcherFlowControl);
- this.conf.setNumExecutorThreadPoolSize(5);
-
- sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
- bkExecutor = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
- .setUncaughtExceptionHandler((thread, ex) ->
log.info("Uncaught exception", ex))
- .build());
-
- mockZooKeeper = createMockZooKeeper();
- mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);
-
- startBroker();
- }
-
protected final void internalCleanup() throws Exception {
// if init fails, some of these could be null, and if so would throw
// an NPE in shutdown, obscuring the real error
@@ -196,7 +185,7 @@ public abstract class MockedPulsarServiceBaseTest {
pulsarClient = null;
}
if (pulsar != null) {
- pulsar.close();
+ stopBroker();
pulsar = null;
}
if (mockBookKeeper != null) {
@@ -244,6 +233,8 @@ public abstract class MockedPulsarServiceBaseTest {
}
protected void stopBroker() throws Exception {
+ log.info("Stopping Pulsar broker. brokerServiceUrl: {}
webServiceAddress: {}", pulsar.getBrokerServiceUrl(),
+ pulsar.getWebServiceAddress());
pulsar.close();
pulsar = null;
// Simulate cleanup of ephemeral nodes
@@ -274,6 +265,8 @@ public abstract class MockedPulsarServiceBaseTest {
conf.setAuthorizationEnabled(true);
pulsar.start();
conf.setAuthorizationEnabled(isAuthorizationEnabled);
+ log.info("Pulsar started. brokerServiceUrl: {} webServiceAddress: {}",
pulsar.getBrokerServiceUrl(),
+ pulsar.getWebServiceAddress());
return pulsar;
}
@@ -363,9 +356,9 @@ public abstract class MockedPulsarServiceBaseTest {
public CompletableFuture<ZooKeeper> create(String serverList,
SessionType sessionType,
int zkSessionTimeoutMillis) {
- if (serverList != null &&
-
(serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
- ||
serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
+ if (serverList != null
+ &&
(serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
+ || serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
return CompletableFuture.completedFuture(mockZooKeeperGlobal);
}
@@ -408,7 +401,8 @@ public abstract class MockedPulsarServiceBaseTest {
return false;
}
- public static void setFieldValue(Class<?> clazz, Object classObj, String
fieldName, Object fieldValue) throws Exception {
+ public static void setFieldValue(Class<?> clazz, Object classObj, String
fieldName,
+ Object fieldValue) throws Exception {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(classObj, fieldValue);
@@ -418,7 +412,8 @@ public abstract class MockedPulsarServiceBaseTest {
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setAdvertisedAddress("localhost");
configuration.setClusterName(configClusterName);
- configuration.setAdvertisedAddress("localhost"); // there are TLS
tests in here, they need to use localhost because of the certificate
+ // there are TLS tests in here, they need to use localhost because of
the certificate
+ configuration.setAdvertisedAddress("localhost");
configuration.setManagedLedgerCacheSizeMB(8);
configuration.setActiveConsumerFailoverDelayTimeMillis(0);
configuration.setDefaultNumberOfNamespaceBundles(1);
@@ -435,4 +430,4 @@ public abstract class MockedPulsarServiceBaseTest {
}
private static final Logger log =
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 7f3e7b3..1f85c99 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -111,9 +111,9 @@ public class AuthenticatedProducerConsumerTest extends
ProducerConsumerBase {
} else {
lookupUrl = pulsar.getBrokerServiceUrlTls();
}
- pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
+
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0,
TimeUnit.SECONDS)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true).authentication(auth)
- .enableTls(true).build();
+ .enableTls(true));
}
@AfterMethod(alwaysRun = true)
@@ -241,9 +241,8 @@ public class AuthenticatedProducerConsumerTest extends
ProducerConsumerBase {
EnumSet.allOf(AuthAction.class));
// setup the client
- pulsarClient.close();
- pulsarClient =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
- .operationTimeout(1, TimeUnit.SECONDS).build();
+
replacePulsarClient(PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
+ .operationTimeout(1, TimeUnit.SECONDS));
// unauthorized topic test
Exception pulsarClientException = null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index cf04a4f..a0393bd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -109,12 +109,11 @@ public class AuthenticationTlsHostnameVerificationTest
extends ProducerConsumerB
admin =
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
.tlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true)
.authentication(authTls).build());
- pulsarClient = PulsarClient.builder()
+ replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrlTls())
.statsInterval(0, TimeUnit.SECONDS)
.tlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true)
-
.authentication(authTls).enableTls(true).enableTlsHostnameVerification(hostnameVerificationEnabled)
- .build();
+
.authentication(authTls).enableTls(true).enableTlsHostnameVerification(hostnameVerificationEnabled));
admin.clusters().createCluster("test", new
ClusterData(brokerUrl.toString()));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 933ad88..7c06837 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -197,10 +197,10 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
tenantAdmin.namespaces().grantPermissionOnNamespace(namespace,
subscriptionRole,
Collections.singleton(AuthAction.consume));
- pulsarClient = PulsarClient.builder()
+ replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
- .authentication(authentication)
- .build();
+ .authentication(authentication));
+
// (1) Create subscription name
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
@@ -271,10 +271,10 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
Authentication authentication = new ClientAuthentication(clientRole);
- pulsarClient = PulsarClient.builder()
+ replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
- .authentication(authentication)
- .build();
+ .authentication(authentication));
+
admin.clusters().createCluster("test", new
ClusterData(brokerUrl.toString()));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index dfa1c42..e43b167 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -112,18 +112,15 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
@Override
protected void setup() throws Exception {
conf.setDefaultNumberOfNamespaceBundles(1);
- super.init();
- pulsarClient = PulsarClient.builder()
- .serviceUrl(pulsar.getBrokerServiceUrl())
- .statsInterval(0, TimeUnit.SECONDS)
- .build();
- super.producerBaseSetup();
+ isTcpLookup = true;
+ internalSetup();
+ producerBaseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
- super.internalCleanup();
+ internalCleanup();
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 458c4ae..2c01d29 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -194,18 +194,20 @@ public class MutualAuthenticationTest extends
ProducerConsumerBase {
Set<String> providersClassNames =
Sets.newHashSet(MutualAuthenticationProvider.class.getName());
conf.setAuthenticationProviders(providersClassNames);
- super.init();
- pulsarClient = PulsarClient.builder()
- .serviceUrl(pulsar.getBrokerServiceUrl())
- .authentication(mutualAuth)
- .build();
- super.producerBaseSetup();
+ isTcpLookup = true;
+ internalSetup();
+ producerBaseSetup();
+ }
+
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+ clientBuilder.authentication(mutualAuth);
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
- super.internalCleanup();
+ internalCleanup();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
index bd8adcd..dccd67e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
@@ -64,8 +64,9 @@ public class PulsarMultiListenersWithInternalListenerNameTest
extends MockedPuls
this.conf.setInternalListenerName("internal");
}
- protected PulsarClient newPulsarClient(String url, int intervalInSecs)
throws PulsarClientException {
- return
PulsarClient.builder().serviceUrl(url).listenerName("internal").statsInterval(intervalInSecs,
TimeUnit.SECONDS).build();
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+ clientBuilder.listenerName("internal");
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
index f5ecc29..baaa4b7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithoutInternalListenerNameTest.java
@@ -63,8 +63,9 @@ public class
PulsarMultiListenersWithoutInternalListenerNameTest extends MockedP
this.conf.setAdvertisedListeners(String.format("internal:pulsar://%s:6650,internal:pulsar+ssl://%s:6651",
host, host));
}
- protected PulsarClient newPulsarClient(String url, int intervalInSecs)
throws PulsarClientException {
- return
PulsarClient.builder().serviceUrl(url).listenerName("internal").statsInterval(intervalInSecs,
TimeUnit.SECONDS).build();
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+ clientBuilder.listenerName("internal");
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
index b91a5f0..9ec369a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
@@ -77,7 +77,6 @@ public class TlsProducerConsumerBase extends
ProducerConsumerBase {
if (pulsarClient != null) {
pulsarClient.close();
}
-
ClientBuilder clientBuilder =
PulsarClient.builder().serviceUrl(lookupUrl)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
.operationTimeout(1000, TimeUnit.MILLISECONDS);
@@ -87,7 +86,7 @@ public class TlsProducerConsumerBase extends
ProducerConsumerBase {
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
clientBuilder.authentication(AuthenticationTls.class.getName(),
authParams);
}
- pulsarClient = clientBuilder.build();
+ replacePulsarClient(clientBuilder);
}
protected void internalSetUpForNamespace() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index 49b3c7c..c94b3e4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -80,10 +80,9 @@ public class TokenAuthenticatedProducerConsumerTest extends
ProducerConsumerBase
.authentication(AuthenticationFactory.token(ADMIN_TOKEN))
.build());
- pulsarClient = PulsarClient.builder().serviceUrl(new
URI(pulsar.getBrokerServiceUrl()).toString())
+ replacePulsarClient(PulsarClient.builder().serviceUrl(new
URI(pulsar.getBrokerServiceUrl()).toString())
.statsInterval(0, TimeUnit.SECONDS)
- .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
- .build();
+ .authentication(AuthenticationFactory.token(ADMIN_TOKEN)));
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index b4e7bb0..d0b6349 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -102,10 +102,9 @@ public class TokenOauth2AuthenticatedProducerConsumerTest
extends ProducerConsum
.authentication(authentication)
.build());
- pulsarClient = PulsarClient.builder().serviceUrl(new
URI(pulsar.getBrokerServiceUrl()).toString())
+ replacePulsarClient(PulsarClient.builder().serviceUrl(new
URI(pulsar.getBrokerServiceUrl()).toString())
.statsInterval(0, TimeUnit.SECONDS)
- .authentication(authentication)
- .build();
+ .authentication(authentication));
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
index 50bb2e9..0abebcc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
@@ -134,7 +134,7 @@ public class KeyStoreTlsProducerConsumerTestWithAuth
extends ProducerConsumerBas
authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW,
CLIENT_KEYSTORE_PW);
clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(),
authParams);
}
- pulsarClient = clientBuilder.build();
+ replacePulsarClient(clientBuilder);
}
protected void internalSetUpForNamespace() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
index 70a201c..f288e90 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
@@ -121,7 +121,7 @@ public class KeyStoreTlsProducerConsumerTestWithoutAuth
extends ProducerConsumer
authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW,
CLIENT_KEYSTORE_PW);
clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(),
authParams);
}
- pulsarClient = clientBuilder.build();
+ replacePulsarClient(clientBuilder);
}
protected void internalSetUpForNamespace() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
new file mode 100644
index 0000000..f57fe6e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Commands.ChecksumType;
+import org.apache.pulsar.tests.EnumValuesDataProvider;
+import org.awaitility.Awaitility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class MessageChecksumTest extends BrokerTestBase {
+ private static final Logger log =
LoggerFactory.getLogger(MessageChecksumTest.class);
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ baseSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ }
+
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+ // disable connection pooling
+ clientBuilder.connectionsPerBroker(0);
+ }
+
+ @Override
+ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder)
throws PulsarClientException {
+ return PulsarTestClient.create(clientBuilder);
+ }
+
+ // Enum parameter used to describe the 2 different scenarios in the
+ // testChecksumCompatibilityInMixedVersionBrokerCluster test case
+ enum MixedVersionScenario {
+ CONNECTED_TO_NEW_THEN_OLD_VERSION,
+ CONNECTED_TO_OLD_THEN_NEW_VERSION
+ }
+
+ /**
+ * Pulsar message checksums changed in protocol version v6, broker version
v1.15.
+ *
+ * This test case verifies that a client is able to send messages to an
older broker version
+ * (<= v1.14, protocol version <= v5) in a mixed environment of broker
versions (<= v1.14 & >= v1.15)
+ *
+ * This test case makes the assumption that the message checksum is ignored
+ * if a tampered message can be read by the consumer in the test.
+ *
+ * Scenario behind this test case:
+ *
+ * MixedVersionScenario.CONNECTED_TO_NEW_THEN_OLD_VERSION
+ * A Pulsar client produces the message while connected to a broker that
supports checksums.
+ * While sending the message to the broker is pending, the connection
breaks and the client
+ * connects to another broker that doesn't support message checksums.
+ * In this case, the client should remove the message checksum before
resending it to the broker.
+ * original PR https://github.com/apache/pulsar/pull/43
+ *
+ * MixedVersionScenario.CONNECTED_TO_OLD_THEN_NEW_VERSION
+ * A Pulsar client produces the message while connected to a broker that
doesn't support checksums.
+ * While sending the message to the broker is pending, the connection
breaks and the client
+ * connects to another broker that supports message checksums.
+ * In this case, the client should remove the message checksum before
resending it to the broker.
+ * original PR https://github.com/apache/pulsar/pull/89
+ */
+ @Test(dataProviderClass = EnumValuesDataProvider.class, dataProvider =
"values")
+ public void
testChecksumCompatibilityInMixedVersionBrokerCluster(MixedVersionScenario
mixedVersionScenario)
+ throws Exception {
+ // GIVEN
+ final String topicName =
+
"persistent://prop/use/ns-abc/testChecksumBackwardsCompatibilityWithOldBrokerWithoutChecksumHandling";
+
+ if (mixedVersionScenario ==
MixedVersionScenario.CONNECTED_TO_OLD_THEN_NEW_VERSION) {
+ // Given, the client thinks it's connected to a broker that
doesn't support message checksums
+ makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
+ }
+
+ PulsarTestClient pulsarTestClient = (PulsarTestClient) pulsarClient;
+
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ // inject a CountDownLatch to the pending message callback of the
PulsarTestClient
+ CountDownLatch messageSendingProcessedLatch = new CountDownLatch(2);
+ pulsarTestClient.setPendingMessageCallback(__ ->
messageSendingProcessedLatch.countDown());
+
+ // WHEN
+ // a message is sent, it should succeed
+ producer.send("message-1".getBytes());
+
+ // And
+ // communication OpSend messages are dropped to simulate a broken
connection so that
+ // the next message doesn't get sent out yet and can be tampered
before it's sent out
+ pulsarTestClient.dropOpSendMessages();
+
+ // And
+ // another message is sent
+ byte[] messageBytes = "message-2".getBytes();
+ TypedMessageBuilder<byte[]> messageBuilder =
producer.newMessage().value(messageBytes);
+ CompletableFuture<MessageId> tamperedMessageSendFuture =
messageBuilder.sendAsync();
+
+ // And
+ // until the message checksum has been calculated and it is pending
+ messageSendingProcessedLatch.await();
+ pulsarTestClient.setPendingMessageCallback(null);
+
+ // And
+ // the producer disconnects from the broker and the test client is put
in a mode where reconnecting is rejected
+ pulsarTestClient.disconnectProducerAndRejectReconnecting(producer);
+
+ // And
+ // when the the message is tampered by changing the last byte to '3'.
This corrupts the already calculated
+ // checksum.
+ ((TypedMessageBuilderImpl<byte[]>)
messageBuilder).getContent().put(messageBytes.length - 1, (byte) '3');
+
+ if (mixedVersionScenario ==
MixedVersionScenario.CONNECTED_TO_NEW_THEN_OLD_VERSION) {
+ // Given, the client thinks it's connected to a broker that
doesn't support message checksums
+ makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
+ } else {
+ // Reset the overriding set in the beginning
+ resetOverridingConnectedBrokerVersion();
+ }
+
+ // And
+ // when finally the pulsar client is allowed to reconnect to the broker
+ pulsarTestClient.allowReconnecting();
+
+ // THEN
+ try {
+ // sending of tampered message should not fail since the client is
expected to remove the checksum from the
+ // message before sending it an older broker version
+ tamperedMessageSendFuture.get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ fail("Broker shouldn't verify checksum for corrupted message and
it shouldn't fail", e);
+ }
+
+ // and then
+ // first message is received
+ Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+ // and contains the expected payload
+ assertEquals(new String(msg.getData()), "message-1");
+ // second message is received
+ msg = consumer.receive(1, TimeUnit.SECONDS);
+ // and contains the tampered payload
+ assertEquals(new String(msg.getData()), "message-3");
+ }
+
+ private void
makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport() {
+ // make the client think that the connected broker is of version which
doesn't support checksum validation
+ ((PulsarTestClient)
pulsarClient).setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
+ }
+
+ private void resetOverridingConnectedBrokerVersion() {
+ // reset the override and use the actual protocol version
+ ((PulsarTestClient)
pulsarClient).setOverrideRemoteEndpointProtocolVersion(0);
+ }
+
+ private void
waitUntilMessageIsPendingWithCalculatedChecksum(ProducerImpl<?> producer) {
+ // wait until the message is in the pending queue
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(producer.getPendingQueueSize(), 1);
+ });
+ }
+
+ @Test
+ public void testTamperingMessageIsDetected() throws Exception {
+ // GIVEN
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+
.topic("persistent://prop/use/ns-abc/testTamperingMessageIsDetected")
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ TypedMessageBuilderImpl<byte[]> msgBuilder =
(TypedMessageBuilderImpl<byte[]>) producer.newMessage()
+ .value("a message".getBytes());
+ MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder()
+ .setProducerName("test")
+ .setSequenceId(1)
+ .setPublishTime(10L);
+ ByteBuf payload = Unpooled.wrappedBuffer(msgBuilder.getContent());
+
+ // WHEN
+ // protocol message is created with checksum
+ ByteBufPair cmd = Commands.newSend(1, 1, 1, ChecksumType.Crc32c,
msgMetadata, payload);
+ OpSendMsg op = OpSendMsg.create((MessageImpl<byte[]>)
msgBuilder.getMessage(), cmd, 1, null);
+
+ // THEN
+ // the checksum validation passes
+ assertTrue(producer.verifyLocalBufferIsNotCorrupted(op));
+
+ // WHEN
+ // the content of the message is tampered
+ msgBuilder.getContent().put(0, (byte) 'b');
+ // the checksum validation fails
+ assertFalse(producer.verifyLocalBufferIsNotCorrupted(op));
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index e997c03..7162592 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -18,27 +18,14 @@
*/
package org.apache.pulsar.client.impl;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -47,16 +34,10 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.common.protocol.ByteBufPair;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.Commands.ChecksumType;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.tests.EnumValuesDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -76,451 +57,121 @@ public class MessageIdTest extends BrokerTestBase {
internalCleanup();
}
- @Test(timeOut = 10000)
- public void producerSendAsync() throws PulsarClientException {
- // 1. Basic Config
- String key = "producerSendAsync";
+ @Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class,
dataProvider = "values")
+ public void producerSendAsync(TopicType topicType) throws
PulsarClientException, PulsarAdminException {
+ // Given
+ String key = "producerSendAsync-" + topicType;
final String topicName = "persistent://prop/cluster/namespace/topic-"
+ key;
final String subscriptionName = "my-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String messagePrefix = "my-message-" + key + "-";
final int numberOfMessages = 30;
+ if (topicType == TopicType.PARTITIONED) {
+ int numberOfPartitions = 3;
+ admin.topics().createPartitionedTopic(topicName,
numberOfPartitions);
+ }
- // 2. Create Producer
- Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
- // 3. Create Consumer
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
.subscribe();
- // 4. Publish message and get message id
- Set<MessageId> messageIds = new HashSet<>();
+ // When
+ // Messages are published asynchronously
List<Future<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < numberOfMessages; i++) {
- String message = messagePredicate + i;
+ String message = messagePrefix + i;
futures.add(producer.sendAsync(message.getBytes()));
}
+ // Then
+ // expect that the Message Ids of subsequently sent messages are in
ascending order
+ Set<MessageId> messageIds = new HashSet<>();
MessageIdImpl previousMessageId = null;
for (Future<MessageId> f : futures) {
try {
MessageIdImpl currentMessageId = (MessageIdImpl) f.get();
if (previousMessageId != null) {
-
Assert.assertTrue(currentMessageId.compareTo(previousMessageId) > 0,
+ assertTrue(currentMessageId.compareTo(previousMessageId) >
0,
"Message Ids should be in ascending order");
}
messageIds.add(currentMessageId);
previousMessageId = currentMessageId;
} catch (Exception e) {
- Assert.fail("Failed to publish message, Exception: " +
e.getMessage());
+ fail("Failed to publish message", e);
}
}
- // 4. Check if message Ids are correct
- log.info("Message IDs = " + messageIds);
- Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all
messages published successfully");
+ // And
+ // expect that there's a message id for each sent out message
+ // and that all messages have been received by the consumer
+ log.info("Message IDs = {}", messageIds);
+ assertEquals(messageIds.size(), numberOfMessages, "Not all messages
published successfully");
for (int i = 0; i < numberOfMessages; i++) {
Message<byte[]> message = consumer.receive();
- Assert.assertEquals(new String(message.getData()),
messagePredicate + i);
+ assertEquals(new String(message.getData()), messagePrefix + i);
MessageId messageId = message.getMessageId();
- Assert.assertTrue(messageIds.remove(messageId), "Failed to receive
message");
- }
- log.info("Message IDs = " + messageIds);
- Assert.assertEquals(messageIds.size(), 0, "Not all messages received
successfully");
- consumer.unsubscribe();
- }
-
- @Test(timeOut = 10000)
- public void producerSend() throws PulsarClientException {
- // 1. Basic Config
- String key = "producerSend";
- final String topicName = "persistent://prop/cluster/namespace/topic-"
+ key;
- final String subscriptionName = "my-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
- final int numberOfMessages = 30;
-
- // 2. Create Producer
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
-
- // 3. Create Consumer
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
- .subscribe();
-
- // 4. Publish message and get message id
- Set<MessageId> messageIds = new HashSet<>();
- for (int i = 0; i < numberOfMessages; i++) {
- String message = messagePredicate + i;
- messageIds.add(producer.send(message.getBytes()));
- }
-
- // 4. Check if message Ids are correct
- log.info("Message IDs = " + messageIds);
- Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all
messages published successfully");
-
- for (int i = 0; i < numberOfMessages; i++) {
-
Assert.assertTrue(messageIds.remove(consumer.receive().getMessageId()), "Failed
to receive Message");
- }
- log.info("Message IDs = " + messageIds);
- Assert.assertEquals(messageIds.size(), 0, "Not all messages received
successfully");
- consumer.unsubscribe();
- ;
- }
-
- @Test(timeOut = 10000)
- public void partitionedProducerSendAsync() throws PulsarClientException,
PulsarAdminException {
- // 1. Basic Config
- String key = "partitionedProducerSendAsync";
- final String topicName = "persistent://prop/cluster/namespace/topic-"
+ key;
- final String subscriptionName = "my-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
- final int numberOfMessages = 30;
- int numberOfPartitions = 3;
- admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
-
- // 2. Create Producer
- Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
-
- // 3. Create Consumer
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
- .subscribe();
-
- // 4. Publish message and get message id
- Set<MessageId> messageIds = new HashSet<>();
- Set<Future<MessageId>> futures = new HashSet<>();
- for (int i = 0; i < numberOfMessages; i++) {
- String message = messagePredicate + i;
- futures.add(producer.sendAsync(message.getBytes()));
- }
-
- futures.forEach(f -> {
- try {
- messageIds.add(f.get());
- } catch (Exception e) {
- Assert.fail("Failed to publish message, Exception: " +
e.getMessage());
+ if (topicType == TopicType.PARTITIONED) {
+ messageId = ((TopicMessageIdImpl)
messageId).getInnerMessageId();
}
- });
-
- // 4. Check if message Ids are correct
- log.info("Message IDs = " + messageIds);
- Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all
messages published successfully");
-
- for (int i = 0; i < numberOfMessages; i++) {
- MessageId topicMessageId = consumer.receive().getMessageId();
- MessageId messageId =
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
- log.info("Message ID Received = " + messageId);
- Assert.assertEquals(topicMessageId.toString(),
messageId.toString());
- Assert.assertTrue(messageIds.remove(messageId), "Failed to receive
Message");
+ assertTrue(messageIds.remove(messageId), "Failed to receive
message");
}
- log.info("Message IDs = " + messageIds);
- Assert.assertEquals(messageIds.size(), 0, "Not all messages received
successfully");
+ log.info("Remaining message IDs = {}", messageIds);
+ assertEquals(messageIds.size(), 0, "Not all messages received
successfully");
consumer.unsubscribe();
}
- @Test(timeOut = 10000)
- public void partitionedProducerSend() throws PulsarClientException,
PulsarAdminException {
- // 1. Basic Config
- String key = "partitionedProducerSend";
+ @Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class,
dataProvider = "values")
+ public void producerSend(TopicType topicType) throws
PulsarClientException, PulsarAdminException {
+ // Given
+ String key = "producerSend-" + topicType;
final String topicName = "persistent://prop/cluster/namespace/topic-"
+ key;
final String subscriptionName = "my-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String messagePrefix = "my-message-" + key + "-";
final int numberOfMessages = 30;
- int numberOfPartitions = 7;
- admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
+ if (topicType == TopicType.PARTITIONED) {
+ int numberOfPartitions = 7;
+ admin.topics().createPartitionedTopic(topicName,
numberOfPartitions);
+ }
- // 2. Create Producer
- Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .enableBatching(false)
+ .topic(topicName)
+ .create();
- // 3. Create Consumer
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
.subscribe();
- // 4. Publish message and get message id
+ // When
+ // Messages are published
Set<MessageId> messageIds = new HashSet<>();
for (int i = 0; i < numberOfMessages; i++) {
- String message = messagePredicate + i;
+ String message = messagePrefix + i;
messageIds.add(producer.send(message.getBytes()));
}
- // 4. Check if message Ids are correct
- log.info("Message IDs = " + messageIds);
- Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all
messages published successfully");
+ // Then
+ // expect that the Message Ids of subsequently sent messages are in
ascending order
+ log.info("Message IDs = {}", messageIds);
+ assertEquals(messageIds.size(), numberOfMessages, "Not all messages
published successfully");
for (int i = 0; i < numberOfMessages; i++) {
- MessageId topicMessageId = consumer.receive().getMessageId();
- MessageId messageId =
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
- Assert.assertEquals(topicMessageId.toString(),
messageId.toString());
- Assert.assertTrue(messageIds.remove(messageId), "Failed to receive
Message");
- }
- log.info("Message IDs = " + messageIds);
- Assert.assertEquals(messageIds.size(), 0, "Not all messages received
successfully");
- // TODO - this statement causes the broker to hang - need to look into
- // it
- // consumer.unsubscribe();
- }
-
- /**
- * Verifies: different versions of broker-deployment (one broker
understands Checksum and other doesn't in that case
- * remove checksum before sending to broker-2)
- *
- * client first produce message with checksum and then retries to send
message due to connection unavailable. But
- * this time, if broker doesn't understand checksum: then client should
remove checksum from the message before
- * sending to broker.
- *
- * 1. stop broker 2. client compute checksum and add into message 3.
produce 2 messages and corrupt 1 message 4.
- * start broker with lower version (which doesn't support checksum) 5.
client reconnects to broker and due to
- * incompatibility of version: removes checksum from message 6. broker
doesn't do checksum validation and persist
- * message 7. client receives ack
- *
- * @throws Exception
- */
- @Test
- public void testChecksumVersionComptability() throws Exception {
- final String topicName = "persistent://prop/use/ns-abc/topic1";
-
- // 1. producer connect
- ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>)
pulsarClient.newProducer().topic(topicName)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
- ProducerImpl<byte[]> producer = spy(prod);
- // return higher version compare to broker : so, it forces
client-producer to remove checksum from payload
- doReturn(producer.brokerChecksumSupportedVersion() +
1).when(producer).brokerChecksumSupportedVersion();
- doAnswer(invocationOnMock ->
prod.getState()).when(producer).getState();
- doAnswer(invocationOnMock ->
prod.getClientCnx()).when(producer).getClientCnx();
- doAnswer(invocationOnMock -> prod.cnx()).when(producer).cnx();
-
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
-
- // Stop the broker, and publishes messages. Messages are accumulated
in the producer queue and they're checksums
- // would have already been computed. If we change the message content
at that point, it should result in a
- // checksum validation error
- stopBroker();
-
- // stop timer to auto-reconnect as let spy-Producer connect to broker
manually so, spy-producer object can get
- // mock-value from brokerChecksumSupportedVersion
- ((PulsarClientImpl) pulsarClient).timer().stop();
-
- ClientCnx mockClientCnx = spy(
- new ClientCnx(new ClientConfigurationData(),
((PulsarClientImpl) pulsarClient).eventLoopGroup()));
- doReturn(producer.brokerChecksumSupportedVersion() -
1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
- prod.setClientCnx(mockClientCnx);
-
- CompletableFuture<MessageId> future1 =
producer.sendAsync("message-1".getBytes());
-
- byte[] a2 = "message-2".getBytes();
- TypedMessageBuilder<byte[]> msg2 = producer.newMessage().value(a2);
-
- CompletableFuture<MessageId> future2 = msg2.sendAsync();
-
- // corrupt the message, new content would be 'message-3'
- ((TypedMessageBuilderImpl<byte[]>) msg2).getContent().put(a2.length -
1, (byte) '3');
-
- prod.setClientCnx(null);
-
- // Restart the broker to have the messages published
- startBroker();
-
- // grab broker connection with mocked producer which has higher
version compare to broker
- prod.grabCnx();
-
- try {
- // it should not fail: as due to unsupported version of broker:
client removes checksum and broker should
- // ignore the checksum validation
- future1.get();
- future2.get();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Broker shouldn't verify checksum for corrupted message and
it shouldn't fail");
- }
-
- ((ConsumerImpl<byte[]>) consumer).grabCnx();
- // We should only receive msg1
- Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
- assertEquals(new String(msg.getData()), "message-1");
- msg = consumer.receive(1, TimeUnit.SECONDS);
- assertEquals(new String(msg.getData()), "message-3");
-
- }
-
- @Test
- public void testChecksumReconnection() throws Exception {
- final String topicName = "persistent://prop/use/ns-abc/topic1";
-
- // 1. producer connect
- ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>)
pulsarClient.newProducer().topic(topicName)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
- ProducerImpl<byte[]> producer = spy(prod);
- // mock: broker-doesn't support checksum (remote_version <
brokerChecksumSupportedVersion) so, it forces
- // client-producer to perform checksum-strip from msg at reconnection
- doReturn(producer.brokerChecksumSupportedVersion() +
1).when(producer).brokerChecksumSupportedVersion();
- doAnswer(invocationOnMock ->
prod.getState()).when(producer).getState();
- doAnswer(invocationOnMock ->
prod.getClientCnx()).when(producer).getClientCnx();
- doAnswer(invocationOnMock -> prod.cnx()).when(producer).cnx();
-
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
-
- stopBroker();
-
- // stop timer to auto-reconnect as let spy-Producer connect to broker
- // manually so, spy-producer object can get
- // mock-value from brokerChecksumSupportedVersion
- ((PulsarClientImpl) pulsarClient).timer().stop();
-
- // set clientCnx mock to get non-checksum supported version
- ClientCnx mockClientCnx = spy(
- new ClientCnx(new ClientConfigurationData(),
((PulsarClientImpl) pulsarClient).eventLoopGroup()));
- doReturn(producer.brokerChecksumSupportedVersion() -
1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
- prod.setClientCnx(mockClientCnx);
-
- CompletableFuture<MessageId> future1 =
producer.sendAsync("message-1".getBytes());
-
- byte[] a2 = "message-2".getBytes();
- TypedMessageBuilder<byte[]> msg2 = producer.newMessage().value(a2);
-
- CompletableFuture<MessageId> future2 = msg2.sendAsync();
-
- // corrupt the message, new content would be 'message-3'
- ((TypedMessageBuilderImpl<byte[]>) msg2).getContent().put(a2.length -
1, (byte) '3');
-
- // unset mock
- prod.setClientCnx(null);
-
- // Restart the broker to have the messages published
- startBroker();
-
- // grab broker connection with mocked producer which has higher version
- // compare to broker
- prod.grabCnx();
-
- try {
- // it should not fail: as due to unsupported version of broker:
- // client removes checksum and broker should
- // ignore the checksum validation
- future1.get(10, TimeUnit.SECONDS);
- future2.get(10, TimeUnit.SECONDS);
- } catch (Exception e) {
- e.printStackTrace();
- fail("Broker shouldn't verify checksum for corrupted message and
it shouldn't fail");
- }
-
- ((ConsumerImpl<byte[]>) consumer).grabCnx();
- // We should only receive msg1
- Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
- assertEquals(new String(msg.getData()), "message-1");
- msg = consumer.receive(1, TimeUnit.SECONDS);
- assertEquals(new String(msg.getData()), "message-3");
-
- }
-
- /**
- * Verifies: if message is corrupted before sending to broker and if
broker gives checksum error: then 1.
- * Client-Producer recomputes checksum with modified data 2. Retry
message-send again 3. Broker verifies checksum 4.
- * client receives send-ack success
- *
- * @throws Exception
- */
- @Test
- public void testCorruptMessageRemove() throws Exception {
-
- final String topicName = "persistent://prop/use/ns-abc/retry-topic";
-
- // 1. producer connect
- ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
- .topic(topicName)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .sendTimeout(10, TimeUnit.MINUTES)
- .create();
- ProducerImpl<byte[]> producer = spy(prod);
- Field producerIdField =
ProducerImpl.class.getDeclaredField("producerId");
- producerIdField.setAccessible(true);
- long producerId = (long) producerIdField.get(producer);
- producer.cnx().registerProducer(producerId, producer); // registered
spy ProducerImpl
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
-
- // 2. Stop the broker, and publishes messages. Messages are
accumulated in the producer queue and they're
- // checksums
- // would have already been computed. If we change the message content
at that point, it should result in a
- // checksum validation error
- // enable checksum at producer
- stopBroker();
-
- byte[] a = "message-1".getBytes();
- TypedMessageBuilder<byte[]> msg = producer.newMessage().value(a);
-
- CompletableFuture<MessageId> future = msg.sendAsync();
-
- // corrupt the message, new content would be 'message-2'
- ((TypedMessageBuilderImpl<byte[]>) msg).getContent().put(a.length - 1,
(byte) '2');
-
- // 4. Restart the broker to have the messages published
- startBroker();
-
- try {
- future.get();
- fail("send message should have failed with checksum excetion");
- } catch (Exception e) {
- if (e.getCause() instanceof
PulsarClientException.ChecksumException) {
- // ok (callback should get checksum exception as message was
modified and corrupt)
- } else {
- fail("Callback should have only failed with
ChecksumException", e);
+ MessageId messageId = consumer.receive().getMessageId();
+ if (topicType == TopicType.PARTITIONED) {
+ messageId = ((TopicMessageIdImpl)
messageId).getInnerMessageId();
}
+ assertTrue(messageIds.remove(messageId), "Failed to receive
Message");
}
-
- // 5. Verify
- /**
- * verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => validates
if message is corrupt
- */
- byte[] a2 = "message-2".getBytes();
-
- TypedMessageBuilderImpl<byte[]> msg2 =
(TypedMessageBuilderImpl<byte[]>)
producer.newMessage().value("message-1".getBytes());
- ByteBuf payload = Unpooled.wrappedBuffer(msg2.getContent());
- MessageMetadata msgMetadata = ((TypedMessageBuilderImpl<byte[]>)
msg).getMetadataBuilder();
-
msgMetadata.setProducerName("test").setSequenceId(1).setPublishTime(10L);
- ByteBufPair cmd = Commands.newSend(producerId, 1, 1,
ChecksumType.Crc32c, msgMetadata, payload);
- // (a) create OpSendMsg with message-data : "message-1"
- OpSendMsg op = OpSendMsg.create(((MessageImpl<byte[]>)
msg2.getMessage()), cmd, 1, null);
- // a.verify: as message is not corrupt: no need to update checksum
- assertTrue(producer.verifyLocalBufferIsNotCorrupted(op));
-
- // (b) corrupt message
- msg2.getContent().put(a2.length - 1, (byte) '2'); // new content would
be 'message-2'
- // b. verify: as message is corrupt: update checksum
- assertFalse(producer.verifyLocalBufferIsNotCorrupted(op));
-
- assertEquals(producer.getPendingQueueSize(), 0);
-
- // [2] test-recoverChecksumError functionality
- stopBroker();
-
- TypedMessageBuilderImpl<byte[]> msg1 =
(TypedMessageBuilderImpl<byte[]>)
producer.newMessage().value("message-1".getBytes());
- future = msg1.sendAsync();
- ClientCnx cnx = spy(
- new ClientCnx(new ClientConfigurationData(),
((PulsarClientImpl) pulsarClient).eventLoopGroup()));
- String exc = "broker is already stopped";
- // when client-try to recover checksum by resending to broker: throw
exception as broker is stopped
- doThrow(new IllegalStateException(exc)).when(cnx).ctx();
- try {
- producer.recoverChecksumError(cnx, 1);
- fail("it should call : resendMessages() => which should throw
above mocked exception");
- } catch (IllegalStateException e) {
- assertEquals(exc, e.getMessage());
- }
-
- producer.close();
- consumer.close();
- producer = null; // clean reference of mocked producer
+ log.info("Remaining message IDs = {}", messageIds);
+ assertEquals(messageIds.size(), 0, "Not all messages received
successfully");
+ consumer.unsubscribe();
}
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
new file mode 100644
index 0000000..9543f6e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
+
+/**
+ * A Pulsar Client that is used for testing scenarios where the different
+ * asynchronous operations of the client-broker interaction must be
orchestrated by the test
+ * so that race conditions caused by the test code can be eliminated.
+ *
+ * features:
+ * - can override remote endpoint protocol version in a thread safe manner
+ * - can reject new connections from the client to the broker
+ * - can drop all OpSend messages after they have been added to
pendingMessages and processed
+ * by the client. This simulates a situation where sending messages go to a
"black hole".
+ * - can synchronize operations with the help of the pending message callback
which gets
+ * called after the message to send out has been added to the pending
messages in the client.
+ *
+ */
+public class PulsarTestClient extends PulsarClientImpl {
+ private volatile int overrideRemoteEndpointProtocolVersion;
+ private volatile boolean rejectNewConnections;
+ private volatile boolean dropOpSendMessages;
+ private volatile Consumer<ProducerImpl.OpSendMsg> pendingMessageCallback;
+
+ /**
+ * Create a new PulsarTestClient instance.
+ *
+ * @param clientBuilder ClientBuilder instance containing the
configuration of the client
+ * @return a new
+ * @throws PulsarClientException
+ */
+ public static PulsarTestClient create(ClientBuilder clientBuilder) throws
PulsarClientException {
+ ClientConfigurationData clientConfigurationData =
+ ((ClientBuilderImpl)
clientBuilder).getClientConfigurationData();
+
+ // the reason to do all the following is to be able to pass the
supplier for creating new ClientCnx
+ // instances after the constructor of PulsarClientImpl has been called.
+ // An anonymous subclass of ClientCnx class is used to override the
getRemoteEndpointProtocolVersion()
+ // method.
+ EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(clientConfigurationData.getNumIoThreads(),
+ new DefaultThreadFactory("pulsar-client-io",
Thread.currentThread().isDaemon()));
+
+ AtomicReference<Supplier<ClientCnx>> clientCnxSupplierReference = new
AtomicReference<>();
+ ConnectionPool connectionPool = new
ConnectionPool(clientConfigurationData, eventLoopGroup,
+ () -> clientCnxSupplierReference.get().get());
+
+ return new PulsarTestClient(clientConfigurationData, eventLoopGroup,
connectionPool,
+ clientCnxSupplierReference);
+ }
+
+ private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool,
+ AtomicReference<Supplier<ClientCnx>>
clientCnxSupplierReference)
+ throws PulsarClientException {
+ super(conf, eventLoopGroup, cnxPool);
+ // workaround initialization order issue so that ClientCnx can be
created in this class
+ clientCnxSupplierReference.set(this::createClientCnx);
+ }
+
+ /**
+ * Overrides the default ClientCnx implementation with an implementation
that overrides the
+ * getRemoteEndpointProtocolVersion() method. This is used to test client
behaviour in certain cases.
+ *
+ * @return new ClientCnx instance
+ */
+ protected ClientCnx createClientCnx() {
+ return new ClientCnx(conf, eventLoopGroup) {
+ @Override
+ public int getRemoteEndpointProtocolVersion() {
+ return overrideRemoteEndpointProtocolVersion != 0
+ ? overrideRemoteEndpointProtocolVersion
+ : super.getRemoteEndpointProtocolVersion();
+ }
+ };
+ }
+
+ /**
+ * Overrides the getConnection method to reject new connections from being
established between
+ * the client and brokers.
+ *
+ * @param topic the topic for the connection
+ * @return the ClientCnx to use, passed a future. Will complete with an
exception when connections are rejected.
+ */
+ @Override
+ protected CompletableFuture<ClientCnx> getConnection(String topic) {
+ if (rejectNewConnections) {
+ CompletableFuture<ClientCnx> result = new CompletableFuture<>();
+ result.completeExceptionally(new IOException("New connections are
rejected."));
+ return result;
+ } else {
+ return super.getConnection(topic);
+ }
+ }
+
+ /**
+ * Overrides the producer instance with an anonynomous subclass that adds
hooks for observing new
+ * OpSendMsg instances being added to pending messages in the client.
+ * It also configures the hook to drop OpSend messages when dropping is
enabled.
+ */
+ @Override
+ protected <T> ProducerImpl<T> newProducerImpl(String topic, int
partitionIndex, ProducerConfigurationData conf,
+ Schema<T> schema,
ProducerInterceptors interceptors,
+
CompletableFuture<Producer<T>> producerCreatedFuture) {
+ return new ProducerImpl<T>(this, topic, conf, producerCreatedFuture,
partitionIndex, schema,
+ interceptors) {
+ @Override
+ protected BlockingQueue<OpSendMsg> createPendingMessagesQueue() {
+ return new
ArrayBlockingQueue<OpSendMsg>(conf.getMaxPendingMessages()) {
+ @Override
+ public void put(OpSendMsg opSendMsg) throws
InterruptedException {
+ super.put(opSendMsg);
+ if (pendingMessageCallback != null) {
+ pendingMessageCallback.accept(opSendMsg);
+ }
+ }
+ };
+ }
+
+ @Override
+ protected boolean shouldWriteOpSendMsg() {
+ if (dropOpSendMessages) {
+ return false;
+ } else {
+ return super.shouldWriteOpSendMsg();
+ }
+ }
+ };
+ }
+
+ public void setOverrideRemoteEndpointProtocolVersion(int
overrideRemoteEndpointProtocolVersion) {
+ this.overrideRemoteEndpointProtocolVersion =
overrideRemoteEndpointProtocolVersion;
+ }
+
+ public void setRejectNewConnections(boolean rejectNewConnections) {
+ this.rejectNewConnections = rejectNewConnections;
+ }
+
+ /**
+ * Simulates the producer connection getting dropped. Will also reject
reconnections to simulate an
+ * outage. This reduces race conditions since the reconnection has to be
explicitly enabled by calling
+ * allowReconnecting() method.
+ */
+ public void disconnectProducerAndRejectReconnecting(ProducerImpl<?>
producer) throws IOException {
+ // wait until all possible in-flight messages have been delivered
+ Awaitility.await().untilAsserted(() -> {
+ if (!dropOpSendMessages && producer.isConnected()) {
+ assertEquals(producer.getPendingQueueSize(), 0);
+ }
+ });
+
+ // reject new connection attempts
+ setRejectNewConnections(true);
+
+ // make the existing connection between the producer and broker to
break by explicitly closing it
+ ClientCnx cnx = producer.cnx();
+ producer.connectionClosed(cnx);
+ cnx.close();
+ }
+
+ /**
+ * Resets possible dropping of OpSend messages and allows the client to
reconnect to the broker.
+ */
+ public void allowReconnecting() {
+ dropOpSendMessages = false;
+ setRejectNewConnections(false);
+ }
+
+ /**
+ * Assigns the callback to use for handling OpSend messages once a message
had been added to pending messages.
+ * @param pendingMessageCallback
+ */
+ public void setPendingMessageCallback(
+ Consumer<ProducerImpl.OpSendMsg> pendingMessageCallback) {
+ this.pendingMessageCallback = pendingMessageCallback;
+ }
+
+ /**
+ * Enable dropping of OpSend messages after they have been added to
pendingMessages and processed
+ * by the client. The OpSend messages won't be delivered until the
allowReconnecting method has been called.
+ */
+ public void dropOpSendMessages() {
+ this.dropOpSendMessages = true;
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index df05096..2914741 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -319,7 +319,7 @@ public class ClientCnx extends PulsarHandler {
log.debug("{} Connection is ready", ctx.channel());
}
// set remote protocol version to the correct version before we
complete the connection future
- remoteEndpointProtocolVersion = connected.getProtocolVersion();
+ setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
connectionFuture.complete(null);
state = State.Ready;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index ee938e0..61b72fe 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -129,8 +128,8 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
AtomicInteger completed = new AtomicInteger();
for (int partitionIndex = 0; partitionIndex <
topicMetadata.numPartitions(); partitionIndex++) {
String partitionName =
TopicName.get(topic).getPartition(partitionIndex).toString();
- ProducerImpl<T> producer = new ProducerImpl<>(client,
partitionName, conf, new CompletableFuture<>(),
- partitionIndex, schema, interceptors);
+ ProducerImpl<T> producer = client.newProducerImpl(partitionName,
partitionIndex,
+ conf, schema, interceptors, new CompletableFuture<>());
producers.add(producer);
producer.producerCreatedFuture().handle((prod, createException) ->
{
if (createException != null) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index de88e19..729b895 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -152,8 +152,8 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
this.userProvidedProducerName = true;
}
this.partitionIndex = partitionIndex;
- this.pendingMessages =
Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
- this.pendingCallbacks =
Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
+ this.pendingMessages = createPendingMessagesQueue();
+ this.pendingCallbacks = createPendingCallbacksQueue();
this.semaphore = new Semaphore(conf.getMaxPendingMessages(), true);
this.compressor =
CompressionCodecProvider.getCompressionCodec(conf.getCompressionType());
@@ -245,6 +245,14 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
grabCnx();
}
+ protected BlockingQueue<OpSendMsg> createPendingMessagesQueue() {
+ return Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
+ }
+
+ protected BlockingQueue<OpSendMsg> createPendingCallbacksQueue() {
+ return Queues.newArrayBlockingQueue(conf.getMaxPendingMessages());
+ }
+
public ConnectionHandler getConnectionHandler() {
return connectionHandler;
}
@@ -656,7 +664,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return Commands.newSend(producerId, lowestSequenceId,
highestSequenceId, numMessages, getChecksumType(), msgMetadata,
compressedPayload);
}
- private ChecksumType getChecksumType() {
+ protected ChecksumType getChecksumType() {
if (connectionHandler.cnx() == null
|| connectionHandler.cnx().getRemoteEndpointProtocolVersion()
>= brokerChecksumSupportedVersion()) {
return ChecksumType.Crc32c;
@@ -1694,8 +1702,8 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this,
last -> Math.max(last, getHighestSequenceId(op)));
}
- ClientCnx cnx = cnx();
- if (isConnected()) {
+ if (shouldWriteOpSendMsg()) {
+ ClientCnx cnx = cnx();
if (op.msg != null && op.msg.getSchemaState() == None) {
tryRegisterSchema(cnx, op.msg, op.callback);
return;
@@ -1726,6 +1734,16 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
+ /**
+ * Hook method for testing. By returning false, it's possible to prevent
messages
+ * being delivered to the broker.
+ *
+ * @return true if OpSend messages should be written to open connection
+ */
+ protected boolean shouldWriteOpSendMsg() {
+ return isConnected();
+ }
+
private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from) {
final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() <
brokerChecksumSupportedVersion();
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index ed04e51..769cfef 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -90,7 +90,7 @@ public class PulsarClientImpl implements PulsarClient {
private static final Logger log =
LoggerFactory.getLogger(PulsarClientImpl.class);
- private final ClientConfigurationData conf;
+ protected final ClientConfigurationData conf;
private LookupService lookup;
private final ConnectionPool cnxPool;
private final Timer timer;
@@ -109,7 +109,7 @@ public class PulsarClientImpl implements PulsarClient {
private final AtomicLong consumerIdGenerator = new AtomicLong();
private final AtomicLong requestIdGenerator = new AtomicLong();
- private final EventLoopGroup eventLoopGroup;
+ protected final EventLoopGroup eventLoopGroup;
private final MemoryLimitController memoryLimitController;
private final LoadingCache<String, SchemaInfoProvider>
schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(100000)
@@ -290,10 +290,10 @@ public class PulsarClientImpl implements PulsarClient {
ProducerBase<T> producer;
if (metadata.partitions > 0) {
- producer = new
PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf,
metadata.partitions,
- producerCreatedFuture, schema, interceptors);
+ producer = newPartitionedProducerImpl(topic, conf, schema,
interceptors, producerCreatedFuture,
+ metadata);
} else {
- producer = new ProducerImpl<>(PulsarClientImpl.this, topic,
conf, producerCreatedFuture, -1, schema, interceptors);
+ producer = newProducerImpl(topic, -1, conf, schema,
interceptors, producerCreatedFuture);
}
producers.add(producer);
@@ -306,6 +306,53 @@ public class PulsarClientImpl implements PulsarClient {
return producerCreatedFuture;
}
+ /**
+ * Factory method for creating PartitionedProducerImpl instance.
+ *
+ * Allows overriding the PartitionedProducerImpl instance in tests.
+ *
+ * @param topic topic name
+ * @param conf producer configuration
+ * @param schema topic schema
+ * @param interceptors producer interceptors
+ * @param producerCreatedFuture future for signaling completion of async
producer creation
+ * @param metadata partitioned topic metadata
+ * @param <T> message type class
+ * @return new PartitionedProducerImpl instance
+ */
+ protected <T> PartitionedProducerImpl<T> newPartitionedProducerImpl(String
topic,
+
ProducerConfigurationData conf,
+
Schema<T> schema,
+
ProducerInterceptors interceptors,
+
CompletableFuture<Producer<T>> producerCreatedFuture,
+
PartitionedTopicMetadata metadata) {
+ return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic,
conf, metadata.partitions,
+ producerCreatedFuture, schema, interceptors);
+ }
+
+ /**
+ * Factory method for creating ProducerImpl instance.
+ *
+ * Allows overriding the ProducerImpl instance in tests.
+ *
+ * @param topic topic name
+ * @param partitionIndex partition index of a partitioned topic. the value
-1 is used for non-partitioned topics.
+ * @param conf producer configuration
+ * @param schema topic schema
+ * @param interceptors producer interceptors
+ * @param producerCreatedFuture future for signaling completion of async
producer creation
+ * @param <T> message type class
+ * @return
+ */
+ protected <T> ProducerImpl<T> newProducerImpl(String topic, int
partitionIndex,
+ ProducerConfigurationData
conf,
+ Schema<T> schema,
+ ProducerInterceptors
interceptors,
+
CompletableFuture<Producer<T>> producerCreatedFuture) {
+ return new ProducerImpl<>(PulsarClientImpl.this, topic, conf,
producerCreatedFuture, partitionIndex, schema,
+ interceptors);
+ }
+
public CompletableFuture<Consumer<byte[]>>
subscribeAsync(ConsumerConfigurationData<byte[]> conf) {
return subscribeAsync(conf, Schema.BYTES, null);
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index 65e8df3..084ef39 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -18,10 +18,20 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Timer;
-
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadFactory;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -34,16 +44,6 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-import java.lang.reflect.Field;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadFactory;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
/**
* Unit Tests of {@link PartitionedProducerImpl}.
*/
@@ -61,7 +61,7 @@ public class PartitionedProducerImplTest {
client = mock(PulsarClientImpl.class);
schema = mock(Schema.class);
producerInterceptors = mock(ProducerInterceptors.class);
- producerCreatedFuture = mock(CompletableFuture.class);
+ producerCreatedFuture = new CompletableFuture<>();
ClientConfigurationData clientConfigurationData =
mock(ClientConfigurationData.class);
Timer timer = mock(Timer.class);
@@ -70,6 +70,13 @@ public class PartitionedProducerImplTest {
when(client.getConfiguration()).thenReturn(clientConfigurationData);
when(client.timer()).thenReturn(timer);
when(client.newProducer()).thenReturn(producerBuilderImpl);
+ when(client.newProducerImpl(anyString(), anyInt(), any(), any(),
any(), any()))
+ .thenAnswer(invocationOnMock -> {
+ return new ProducerImpl<>(client, invocationOnMock.getArgument(0),
+ invocationOnMock.getArgument(2),
invocationOnMock.getArgument(5),
+ invocationOnMock.getArgument(1),
invocationOnMock.getArgument(3),
+ invocationOnMock.getArgument(4));
+ });
}
@Test
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 0d08849..481517c 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
public abstract class PulsarHandler extends PulsarDecoder {
protected ChannelHandlerContext ctx;
protected SocketAddress remoteAddress;
- protected int remoteEndpointProtocolVersion =
ProtocolVersion.v0.getValue();
+ private int remoteEndpointProtocolVersion = ProtocolVersion.v0.getValue();
private final long keepAliveIntervalSeconds;
private boolean waitingForPingResponse = false;
private ScheduledFuture<?> keepAliveTask;
@@ -43,6 +43,10 @@ public abstract class PulsarHandler extends PulsarDecoder {
return remoteEndpointProtocolVersion;
}
+ protected void setRemoteEndpointProtocolVersion(int
remoteEndpointProtocolVersion) {
+ this.remoteEndpointProtocolVersion = remoteEndpointProtocolVersion;
+ }
+
public PulsarHandler(int keepAliveInterval, TimeUnit unit) {
this.keepAliveIntervalSeconds = unit.toSeconds(keepAliveInterval);
}
@@ -98,7 +102,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
// response later and thus not enforce the strict timeout here.
log.warn("[{}] Forcing connection to close after keep-alive
timeout", ctx.channel());
ctx.close();
- } else if (remoteEndpointProtocolVersion >=
ProtocolVersion.v1.getValue()) {
+ } else if (getRemoteEndpointProtocolVersion() >=
ProtocolVersion.v1.getValue()) {
// Send keep alive probe to peer only if it supports the ping/pong
commands, added in v1
if (log.isDebugEnabled()) {
log.debug("[{}] Sending ping message", ctx.channel());
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
index 4719271..7822c9e 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
@@ -104,7 +104,7 @@ public class ServerConnection extends PulsarHandler {
}
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
state = State.Connected;
- remoteEndpointProtocolVersion = connect.getProtocolVersion();
+ setRemoteEndpointProtocolVersion(connect.getProtocolVersion());
}
@Override
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 909ae77..5d4c576 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -270,7 +270,7 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Init);
- this.remoteEndpointProtocolVersion = connect.getProtocolVersion();
+ this.setRemoteEndpointProtocolVersion(connect.getProtocolVersion());
this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
this.protocolVersionToAdvertise =
getProtocolVersionToAdvertise(connect);
this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ?
connect.getProxyToBrokerUrl() : "null";
@@ -279,11 +279,11 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
LOG.debug("Received CONNECT from {} proxyToBroker={}",
remoteAddress, proxyToBrokerUrl);
LOG.debug(
"[{}] Protocol version to advertise to broker is {},
clientProtocolVersion={}, proxyProtocolVersion={}",
- remoteAddress, protocolVersionToAdvertise,
remoteEndpointProtocolVersion,
+ remoteAddress, protocolVersionToAdvertise,
getRemoteEndpointProtocolVersion(),
Commands.getCurrentProtocolVersion());
}
- if (remoteEndpointProtocolVersion < ProtocolVersion.v10.getValue()) {
+ if (getRemoteEndpointProtocolVersion() <
ProtocolVersion.v10.getValue()) {
LOG.warn("[{}] Client doesn't support connecting through proxy",
remoteAddress);
ctx.close();
return;