This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0e6ca076777697fff5e044ea15252727f594107a Author: Yong Zhang <[email protected]> AuthorDate: Tue Jan 26 18:11:04 2021 +0800 Add refresh authentication command in broker (#9064) * Add refresh authentication command in the broker --- **Motivation** Some authentication provider is using cached authentication data. Until redoing the 'getAuthData' then it will provide the new auth data. So I add a refresh command and do the refresh authentication data provider when received the command. **Verify this change** Existent tests can pass. (cherry picked from commit 373948ee1df6294ef43dafe457a86e53cd86ab32) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 - .../authentication/SaslAuthenticateTest.java | 2 +- .../broker/authentication/AuthenticationState.java | 4 +- .../AuthenticationProviderTokenTest.java | 2 +- .../client/api/MutualAuthenticationTest.java | 2 +- ...kenOauth2AuthenticatedProducerConsumerTest.java | 76 ++++++++++++++++++++++ .../org/apache/pulsar/common/api/AuthData.java | 5 +- .../client/impl/auth/AuthenticationSasl.java | 2 +- .../impl/auth/SaslAuthenticationDataProvider.java | 2 +- .../org/apache/pulsar/client/impl/ClientCnx.java | 14 +++- .../worker/rest/api/FunctionsImplTest.java | 1 + .../pulsar/proxy/server/DirectProxyHandler.java | 13 +++- .../apache/pulsar/proxy/server/ProxyClientCnx.java | 2 +- .../token/PulsarTokenAuthenticationBaseSuite.java | 11 +++- 14 files changed, 122 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 10c54bb..0434fab 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3119,9 +3119,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId()) .setEntryId(lastConfirmedEntry.getEntryId())); } - if (managedLedgerInterceptor != null) { - managedLedgerInterceptor.onUpdateManagedLedgerInfo(propertiesMap); - } for (Map.Entry<String, String> property : propertiesMap.entrySet()) { mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder() .setKey(property.getKey()).setValue(property.getValue())); diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java index e94d2ae..bb20077 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java @@ -260,7 +260,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase { // auth between server and client. // first time auth - AuthData initData1 = dataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA)); + AuthData initData1 = dataProvider.authenticate(AuthData.INIT_AUTH_DATA); AuthData serverData1 = authState.authenticate(initData1); boolean complete = authState.isComplete(); assertFalse(complete); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java index ac881ac..f51e818 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java @@ -68,7 +68,7 @@ public interface AuthenticationState { /** * If the authentication state supports refreshing and the credentials are expired, - * the auth provider will call this method ot initiate the refresh process. + * the auth provider will call this method to initiate the refresh process. * <p> * The auth state here will return the broker side data that will be used to send * a challenge to the client. @@ -77,6 +77,6 @@ public interface AuthenticationState { * @throws AuthenticationException */ default AuthData refreshAuthentication() throws AuthenticationException { - return null; + return AuthData.REFRESH_AUTH_DATA; } } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java index 60fa5f6..c2610aa 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java @@ -634,7 +634,7 @@ public class AuthenticationProviderTokenTest { assertTrue(authState.isComplete()); AuthData brokerData = authState.refreshAuthentication(); - assertNull(brokerData); + assertEquals(brokerData, AuthData.REFRESH_AUTH_DATA); } // tests for Token Audience diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java index ef06323..458c4ae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java @@ -70,7 +70,7 @@ public class MutualAuthenticationTest extends ProducerConsumerBase { String dataString = new String(data.getBytes(), UTF_8); AuthData toSend; - if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA)) { + if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA_BYTES)) { toSend = AuthData.of(clientAuthStrings[0].getBytes(UTF_8)); } else if (Arrays.equals(dataString.getBytes(), serverAuthStrings[0].getBytes(UTF_8))) { toSend = AuthData.of(clientAuthStrings[1].getBytes(UTF_8)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java index cfb80dc..b4e7bb0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java @@ -25,15 +25,19 @@ import java.net.URI; import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.HashSet; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.awaitility.Awaitility; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -62,6 +66,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); + conf.setAuthenticationRefreshCheckSeconds(5); Set<String> superUserRoles = new HashSet<>(); superUserRoles.add(ADMIN_ROLE); @@ -157,4 +162,75 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum log.info("-- Exiting {} test --", methodName); } + @Test + public void testOAuth2TokenRefreshedWithoutReconnect() throws Exception { + log.info("-- Starting {} test --", methodName); + clientSetup(); + + // test rest by admin + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test")); + + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic") + .subscriptionName("my-subscriber-name").subscribe(); + + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic"); + Producer<byte[]> producer = producerBuilder.create(); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message<byte[]> msg = null; + Set<String> messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + + // get the first connection stats + ProducerImpl producerImpl = (ProducerImpl) producer; + String accessTokenOld = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData(); + long lastDisconnectTime = producer.getLastDisconnectedTimestamp(); + + // the token expire duration is 10 seconds, so we need to wait for the authenticationData refreshed + Awaitility.await() + .atLeast(10, TimeUnit.SECONDS) + .atMost(20, TimeUnit.SECONDS) + .with() + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> { + String accessTokenNew = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData(); + Assert.assertNotEquals(accessTokenOld, accessTokenNew); + }); + + // get the lastDisconnectTime, it should be same with the before, because the connection shouldn't disconnect + long lastDisconnectTimeAfterTokenExpired = producer.getLastDisconnectedTimestamp(); + Assert.assertEquals(lastDisconnectTime, lastDisconnectTimeAfterTokenExpired); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + msg = null; + messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java index f12c436..188cb27 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java @@ -28,7 +28,10 @@ import lombok.Data; @Data(staticConstructor = "of") public final class AuthData { // CHECKSTYLE.OFF: StaticVariableName - public static byte[] INIT_AUTH_DATA = "PulsarAuthInit".getBytes(UTF_8); + public static byte[] INIT_AUTH_DATA_BYTES = "PulsarAuthInit".getBytes(UTF_8); + public static byte[] REFRESH_AUTH_DATA_BYTES = "PulsarAuthRefresh".getBytes(UTF_8); + public static AuthData INIT_AUTH_DATA = AuthData.of(INIT_AUTH_DATA_BYTES); + public static AuthData REFRESH_AUTH_DATA = AuthData.of(REFRESH_AUTH_DATA_BYTES); // CHECKSTYLE.ON: StaticVariableName private final byte[] bytes; diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java index ee850e1..54d1802 100644 --- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java +++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java @@ -261,7 +261,7 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication } // first time init headers.put(SASL_HEADER_STATE, SASL_STATE_CLIENT_INIT); - AuthData initData = authData.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA)); + AuthData initData = authData.authenticate(AuthData.INIT_AUTH_DATA); headers.put(SASL_AUTH_TOKEN, Base64.getEncoder().encodeToString(initData.getBytes())); } else { diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java index 261e06c..daae735 100644 --- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java +++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java @@ -51,7 +51,7 @@ public class SaslAuthenticationDataProvider implements AuthenticationDataProvide @Override public AuthData authenticate(AuthData commandData) throws AuthenticationException { // init - if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA)) { + if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA_BYTES)) { if (pulsarSaslClient.hasInitialResponse()) { return pulsarSaslClient.evaluateChallenge(AuthData.of(new byte[0])); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index bbcba68..c551588 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -37,6 +37,7 @@ import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Queue; @@ -138,6 +139,7 @@ public class ClientCnx extends PulsarHandler { private ScheduledFuture<?> timeoutTask; // Added for mutual authentication. + @Getter protected AuthenticationDataProvider authenticationDataProvider; private TransactionBufferHandler transactionBufferHandler; @@ -226,7 +228,7 @@ public class ClientCnx extends PulsarHandler { // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data, // and return authData to server. authenticationDataProvider = authentication.getAuthData(remoteHostName); - AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA)); + AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, null, null); } @@ -318,6 +320,16 @@ public class ClientCnx extends PulsarHandler { checkArgument(authChallenge.hasChallenge()); checkArgument(authChallenge.getChallenge().hasAuthData()); + if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData().toByteArray())) { + try { + authenticationDataProvider = authentication.getAuthData(remoteHostName); + } catch (PulsarClientException e) { + log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e); + connectionFuture.completeExceptionally(e); + return; + } + } + // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture. try { AuthData authData = authenticationDataProvider diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java index d1cf95b..e8618c2 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; +import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.functions.worker.WorkerConfig; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 44d7ad5..3e3d668 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -46,6 +46,7 @@ import io.netty.util.concurrent.FutureListener; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; @@ -57,6 +58,7 @@ import lombok.Getter; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.tls.TlsHostnameVerifier; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.AuthData; @@ -222,7 +224,7 @@ public class DirectProxyHandler { this.ctx = ctx; // Send the Connect command to broker authenticationDataProvider = authentication.getAuthData(remoteHostName); - AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA)); + AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); ByteBuf command = null; command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy", null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod); @@ -262,6 +264,15 @@ public class DirectProxyHandler { checkArgument(authChallenge.hasChallenge()); checkArgument(authChallenge.getChallenge().hasAuthData() && authChallenge.getChallenge().hasAuthData()); + if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData().toByteArray())) { + try { + authenticationDataProvider = authentication.getAuthData(remoteHostName); + } catch (PulsarClientException e) { + log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e); + return; + } + } + // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture. try { AuthData authData = authenticationDataProvider diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java index 0878370..665b9f8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java @@ -54,7 +54,7 @@ public class ProxyClientCnx extends ClientCnx { } authenticationDataProvider = authentication.getAuthData(remoteHostName); - AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA)); + AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, clientAuthMethod); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java index 7a923ee..eb96555 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java @@ -35,6 +35,8 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -288,6 +290,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe admin.namespaces().grantPermissionOnNamespace(namespace, REGULAR_USER_ROLE, EnumSet.allOf(AuthAction.class)); String initialToken = this.createClientTokenWithExpiry(5, TimeUnit.SECONDS); + String refreshedToken = this.createClientTokenWithExpiry(30, TimeUnit.SECONDS); @Cleanup PulsarClient client = PulsarClient.builder() @@ -295,7 +298,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe .authentication(AuthenticationFactory.token(() -> { if (shouldRefreshToken) { try { - return createClientTokenWithExpiry(5, TimeUnit.SECONDS); + return refreshedToken; } catch (Exception e) { return null; } @@ -308,17 +311,19 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe @Cleanup Producer<String> producer = client.newProducer(Schema.STRING) .topic(topic) - .sendTimeout(1, TimeUnit.SECONDS) + .sendTimeout(3, TimeUnit.SECONDS) .create(); - // Initially the token is valid and producer will be able to publish producer.send("hello-1"); + long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp(); Thread.sleep(TimeUnit.SECONDS.toMillis(10)); if (shouldRefreshToken) { // The token will have been refreshed, so the app won't see any error producer.send("hello-2"); + long timestamp = producer.getLastDisconnectedTimestamp(); + assertEquals(timestamp, lastDisconnectedTimestamp); } else { // The token has expired, so this next message will be rejected try {
